[S-RBAC] Fix new policies for FIP PFs APIs

During transition to the new secure RBAC API policies, we made mistake
with policies for FIP PFs by defining them to be available for
ADMIN_OR_PROJECT_MEMBER/READER or FIP owner.
First, rule PROJECT_MEMBER/READER is not appropriate in this case as FIP PFs
don't have tenant_id attribute at all and belongs to the owner of FIP always.
Second issue was that any FIP owner, even with just READER role could possibly
e.g. create port forwarding.

To fix that, this patch changes those API policies to new rules:
ADMIN_OR_PARENT_OWNER_READER
ADMIN_OR_PARENT_OWNER_MEMBER

Conflicts:
    neutron/conf/policies/floatingip_port_forwarding.py

Closes-Bug: #2018989
Change-Id: Ibff4c4f5b6d020fd598831a8a6e8ec0e2f559005
(cherry picked from commit 4edff4fe8d)
This commit is contained in:
Slawek Kaplonski 2023-05-09 12:54:28 +02:00 committed by Rodolfo Alonso Hernandez
parent c82eee0fd6
commit 812526a279
2 changed files with 209 additions and 154 deletions

View File

@ -29,9 +29,7 @@ RESOURCE_PATH = ('/floatingips/{floatingip_id}'
rules = [
policy.DocumentedRuleDefault(
name='create_floatingip_port_forwarding',
check_str=base.policy_or(
base.ADMIN_OR_PROJECT_MEMBER,
base.RULE_PARENT_OWNER),
check_str=base.ADMIN_OR_PARENT_OWNER_MEMBER,
scope_types=['project'],
description='Create a floating IP port forwarding',
operations=[
@ -48,9 +46,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='get_floatingip_port_forwarding',
check_str=base.policy_or(
base.ADMIN_OR_PROJECT_READER,
base.RULE_PARENT_OWNER),
check_str=base.ADMIN_OR_PARENT_OWNER_READER,
scope_types=['project'],
description='Get a floating IP port forwarding',
operations=[
@ -71,9 +67,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='update_floatingip_port_forwarding',
check_str=base.policy_or(
base.ADMIN_OR_PROJECT_MEMBER,
base.RULE_PARENT_OWNER),
check_str=base.ADMIN_OR_PARENT_OWNER_MEMBER,
scope_types=['project'],
description='Update a floating IP port forwarding',
operations=[
@ -90,9 +84,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='delete_floatingip_port_forwarding',
check_str=base.policy_or(
base.ADMIN_OR_PROJECT_MEMBER,
base.RULE_PARENT_OWNER),
check_str=base.ADMIN_OR_PARENT_OWNER_MEMBER,
scope_types=['project'],
description='Delete a floating IP port forwarding',
operations=[

View File

@ -28,16 +28,19 @@ class FloatingipPortForwardingAPITestCase(base.PolicyBaseTestCase):
super(FloatingipPortForwardingAPITestCase, self).setUp()
self.fip = {
'id': uuidutils.generate_uuid(),
'tenant_id': self.project_id,
'project_id': self.project_id}
self.alt_fip = {
'id': uuidutils.generate_uuid(),
'tenant_id': self.alt_project_id,
'project_id': self.alt_project_id}
self.target = {
'project_id': self.project_id,
'floatingip_id': self.fip['id'],
'ext_parent_floatingip_id': self.fip['id']}
self.alt_target = {
'project_id': self.alt_project_id,
'floatingip_id': self.fip['id'],
'ext_parent_floatingip_id': self.fip['id']}
'floatingip_id': self.alt_fip['id'],
'ext_parent_floatingip_id': self.alt_fip['id']}
self.plugin_mock = mock.Mock()
self.plugin_mock.get_floatingip.return_value = self.fip
@ -53,52 +56,68 @@ class SystemAdminTests(FloatingipPortForwardingAPITestCase):
self.context = self.system_admin_ctx
def test_create_fip_pf(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'create_floatingip_port_forwarding',
self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'create_floatingip_port_forwarding',
self.alt_target)
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.fip):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'create_floatingip_port_forwarding',
self.target)
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.alt_fip):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'create_floatingip_port_forwarding',
self.alt_target)
def test_get_fip_pf(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'get_floatingip_port_forwarding',
self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'get_floatingip_port_forwarding',
self.alt_target)
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.fip):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'get_floatingip_port_forwarding',
self.target)
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.alt_fip):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'get_floatingip_port_forwarding',
self.alt_target)
def test_update_fip_pf(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'update_floatingip_port_forwarding',
self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'update_floatingip_port_forwarding',
self.alt_target)
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.fip):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'update_floatingip_port_forwarding',
self.target)
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.alt_fip):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'update_floatingip_port_forwarding',
self.alt_target)
def test_delete_fip_pf(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'delete_floatingip_port_forwarding',
self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'delete_floatingip_port_forwarding',
self.alt_target)
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.fip):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'delete_floatingip_port_forwarding',
self.target)
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.alt_fip):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'delete_floatingip_port_forwarding',
self.alt_target)
class SystemMemberTests(SystemAdminTests):
@ -122,44 +141,60 @@ class AdminTests(FloatingipPortForwardingAPITestCase):
self.context = self.project_admin_ctx
def test_create_fip_pf(self):
self.assertTrue(
policy.enforce(self.context,
'create_floatingip_port_forwarding',
self.target))
self.assertTrue(
policy.enforce(self.context,
'create_floatingip_port_forwarding',
self.alt_target))
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.fip):
self.assertTrue(
policy.enforce(self.context,
'create_floatingip_port_forwarding',
self.target))
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.alt_fip):
self.assertTrue(
policy.enforce(self.context,
'create_floatingip_port_forwarding',
self.alt_target))
def test_get_fip_pf(self):
self.assertTrue(
policy.enforce(self.context,
'get_floatingip_port_forwarding',
self.target))
self.assertTrue(
policy.enforce(self.context,
'get_floatingip_port_forwarding',
self.alt_target))
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.fip):
self.assertTrue(
policy.enforce(self.context,
'get_floatingip_port_forwarding',
self.target))
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.alt_fip):
self.assertTrue(
policy.enforce(self.context,
'get_floatingip_port_forwarding',
self.alt_target))
def test_update_fip_pf(self):
self.assertTrue(
policy.enforce(self.context,
'update_floatingip_port_forwarding',
self.target))
self.assertTrue(
policy.enforce(self.context,
'update_floatingip_port_forwarding',
self.alt_target))
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.fip):
self.assertTrue(
policy.enforce(self.context,
'update_floatingip_port_forwarding',
self.target))
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.alt_fip):
self.assertTrue(
policy.enforce(self.context,
'update_floatingip_port_forwarding',
self.alt_target))
def test_delete_fip_pf(self):
self.assertTrue(
policy.enforce(self.context,
'delete_floatingip_port_forwarding',
self.target))
self.assertTrue(
policy.enforce(self.context,
'delete_floatingip_port_forwarding',
self.alt_target))
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.fip):
self.assertTrue(
policy.enforce(self.context,
'delete_floatingip_port_forwarding',
self.target))
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.alt_fip):
self.assertTrue(
policy.enforce(self.context,
'delete_floatingip_port_forwarding',
self.alt_target))
class ProjectMemberTests(AdminTests):
@ -169,48 +204,64 @@ class ProjectMemberTests(AdminTests):
self.context = self.project_member_ctx
def test_create_fip_pf(self):
self.assertTrue(
policy.enforce(self.context,
'create_floatingip_port_forwarding',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_floatingip_port_forwarding',
self.alt_target)
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.fip):
self.assertTrue(
policy.enforce(self.context,
'create_floatingip_port_forwarding',
self.target))
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.alt_fip):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_floatingip_port_forwarding',
self.alt_target)
def test_get_fip_pf(self):
self.assertTrue(
policy.enforce(self.context,
'get_floatingip_port_forwarding',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_floatingip_port_forwarding',
self.alt_target)
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.fip):
self.assertTrue(
policy.enforce(self.context,
'get_floatingip_port_forwarding',
self.target))
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.alt_fip):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_floatingip_port_forwarding',
self.alt_target)
def test_update_fip_pf(self):
self.assertTrue(
policy.enforce(self.context,
'update_floatingip_port_forwarding',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_floatingip_port_forwarding',
self.alt_target)
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.fip):
self.assertTrue(
policy.enforce(self.context,
'update_floatingip_port_forwarding',
self.target))
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.alt_fip):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_floatingip_port_forwarding',
self.alt_target)
def test_delete_fip_pf(self):
self.assertTrue(
policy.enforce(self.context,
'delete_floatingip_port_forwarding',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_floatingip_port_forwarding',
self.alt_target)
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.fip):
self.assertTrue(
policy.enforce(self.context,
'delete_floatingip_port_forwarding',
self.target))
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.alt_fip):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_floatingip_port_forwarding',
self.alt_target)
class ProjectReaderTests(ProjectMemberTests):
@ -220,37 +271,49 @@ class ProjectReaderTests(ProjectMemberTests):
self.context = self.project_reader_ctx
def test_create_fip_pf(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_floatingip_port_forwarding',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_floatingip_port_forwarding',
self.alt_target)
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.fip):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_floatingip_port_forwarding',
self.target)
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.alt_fip):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_floatingip_port_forwarding',
self.alt_target)
def test_update_fip_pf(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_floatingip_port_forwarding',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_floatingip_port_forwarding',
self.alt_target)
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.fip):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_floatingip_port_forwarding',
self.target)
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.alt_fip):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_floatingip_port_forwarding',
self.alt_target)
def test_delete_fip_pf(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_floatingip_port_forwarding',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_floatingip_port_forwarding',
self.alt_target)
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.fip):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_floatingip_port_forwarding',
self.target)
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.alt_fip):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_floatingip_port_forwarding',
self.alt_target)