[S-RBAC] Fix tests for port RBAC policies

Exisitng tests didn't mock "get_network" function properly thus
there was no "network:tenant_id" field in the target object set and
OwnerCheck check was failing for the "net_owner".
Unit tests were green because they badly expected e.g. for the
ProjectMember that e.g. port with given fixed_ip or mac_address will not
be allowed (exception was raised because net_owner check failed) even
thoug it should be possible through the net_owner rule in policy.

This patch fixes mocking get_network function in unit tests module for
the port RBAC policies and it also updates unit tests accordingly to
make them pass.

Change-Id: I4c26403e237afdb8934dda65fef3b12c99a5f689
Signed-off-by: Slawek Kaplonski <skaplons@redhat.com>
(cherry picked from commit a48b715e8b)
This commit is contained in:
Slawek Kaplonski
2025-09-24 16:46:32 +02:00
parent 0eca108015
commit 1555e14677

View File

@@ -29,20 +29,33 @@ class PortAPITestCase(base.PolicyBaseTestCase):
self.network = { self.network = {
'id': uuidutils.generate_uuid(), 'id': uuidutils.generate_uuid(),
'tenant_id': self.project_id,
'project_id': self.project_id} 'project_id': self.project_id}
self.target = { self.alt_network = {
'project_id': self.project_id, 'id': uuidutils.generate_uuid(),
'tenant_id': self.alt_project_id, 'tenant_id': self.alt_project_id,
'project_id': self.alt_project_id}
self.target = {
'tenant_id': self.project_id,
'project_id': self.project_id,
'network_id': self.network['id'], 'network_id': self.network['id'],
'ext_parent_network_id': self.network['id']} 'ext_parent_network_id': self.network['id']}
self.alt_target = { self.alt_target = {
'tenant_id': self.project_id,
'project_id': self.alt_project_id, 'project_id': self.alt_project_id,
'tenant_id': self.alt_project_id, 'network_id': self.alt_network['id'],
'network_id': self.network['id'], 'ext_parent_network_id': self.alt_network['id']}
'ext_parent_network_id': self.network['id']}
networks = {
self.network['id']: self.network,
self.alt_network['id']: self.alt_network,
}
def get_network(context, id, fields=None):
return networks[id]
self.plugin_mock = mock.Mock() self.plugin_mock = mock.Mock()
self.plugin_mock.get_network.return_value = self.network self.plugin_mock.get_network.side_effect = get_network
mock.patch( mock.patch(
'neutron_lib.plugins.directory.get_plugin', 'neutron_lib.plugins.directory.get_plugin',
return_value=self.plugin_mock).start() return_value=self.plugin_mock).start()
@@ -814,10 +827,8 @@ class ProjectManagerTests(AdminTests):
target['device_owner'] = 'network:test' target['device_owner'] = 'network:test'
alt_target = self.alt_target.copy() alt_target = self.alt_target.copy()
alt_target['device_owner'] = 'network:test' alt_target['device_owner'] = 'network:test'
self.assertRaises( self.assertTrue(
base_policy.PolicyNotAuthorized, policy.enforce(self.context, 'create_port:device_owner', target))
policy.enforce, self.context, 'create_port:device_owner',
target)
self.assertRaises( self.assertRaises(
base_policy.PolicyNotAuthorized, base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:device_owner', policy.enforce, self.context, 'create_port:device_owner',
@@ -1056,10 +1067,8 @@ class ProjectManagerTests(AdminTests):
target['device_owner'] = 'network:test' target['device_owner'] = 'network:test'
alt_target = self.alt_target.copy() alt_target = self.alt_target.copy()
alt_target['device_owner'] = 'network:test' alt_target['device_owner'] = 'network:test'
self.assertRaises( self.assertTrue(
base_policy.PolicyNotAuthorized, policy.enforce(self.context, 'update_port:device_owner', target))
policy.enforce, self.context, 'update_port:device_owner',
target)
self.assertRaises( self.assertRaises(
base_policy.PolicyNotAuthorized, base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:device_owner', policy.enforce, self.context, 'update_port:device_owner',
@@ -1222,6 +1231,199 @@ class ProjectMemberTests(ProjectManagerTests):
super().setUp() super().setUp()
self.context = self.project_member_ctx self.context = self.project_member_ctx
def test_create_port_with_device_owner(self):
target = self.target.copy()
target['device_owner'] = 'network:test'
alt_target = self.alt_target.copy()
alt_target['device_owner'] = 'network:test'
self.assertTrue(
policy.enforce(self.context, 'create_port:device_owner', target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:device_owner',
alt_target)
def test_create_port_with_mac_address(self):
self.assertTrue(
policy.enforce(self.context, 'create_port:mac_address',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:mac_address',
self.alt_target)
def test_create_port_with_fixed_ips(self):
self.assertTrue(
policy.enforce(self.context, 'create_port:fixed_ips', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:fixed_ips',
self.alt_target)
def test_create_port_with_fixed_ips_and_ip_address(self):
self.assertTrue(
policy.enforce(self.context, 'create_port:fixed_ips:ip_address',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:fixed_ips:ip_address',
self.alt_target)
def test_create_port_with_fixed_ips_and_subnet_id(self):
self.assertTrue(
policy.enforce(self.context, 'create_port:fixed_ips:subnet_id',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:fixed_ips:subnet_id',
self.alt_target)
def test_create_port_with_port_security_enabled(self):
self.assertTrue(
policy.enforce(self.context, 'create_port:port_security_enabled',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:port_security_enabled',
self.alt_target)
def test_create_port_with_allowed_address_pairs(self):
self.assertTrue(
policy.enforce(
self.context, 'create_port:allowed_address_pairs',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_port:allowed_address_pairs',
self.alt_target)
def test_create_port_with_allowed_address_pairs_and_mac_address(self):
self.assertTrue(
policy.enforce(
self.context, 'create_port:allowed_address_pairs:mac_address',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_port:allowed_address_pairs:mac_address',
self.alt_target)
def test_create_port_with_allowed_address_pairs_and_ip_address(self):
self.assertTrue(
policy.enforce(
self.context, 'create_port:allowed_address_pairs:ip_address',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_port:allowed_address_pairs:ip_address',
self.alt_target)
def test_update_port_with_device_owner(self):
target = self.target.copy()
target['device_owner'] = 'network:test'
alt_target = self.alt_target.copy()
alt_target['device_owner'] = 'network:test'
self.assertTrue(
policy.enforce(self.context, 'update_port:device_owner', target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:device_owner',
alt_target)
def test_update_port_with_mac_address(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:mac_address',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:mac_address',
self.alt_target)
def test_update_port_with_fixed_ips(self):
self.assertTrue(
policy.enforce(self.context, 'update_port:fixed_ips', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:fixed_ips',
self.alt_target)
def test_update_port_with_fixed_ips_and_ip_address(self):
self.assertTrue(
policy.enforce(self.context, 'update_port:fixed_ips', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:fixed_ips:ip_address',
self.alt_target)
def test_update_port_with_fixed_ips_and_subnet_id(self):
self.assertTrue(
policy.enforce(self.context, 'update_port:fixed_ips', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:fixed_ips:subnet_id',
self.alt_target)
def test_update_port_with_port_security_enabled(self):
self.assertTrue(
policy.enforce(
self.context, 'update_port:port_security_enabled',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:port_security_enabled',
self.alt_target)
def test_update_port_with_allowed_address_pairs(self):
self.assertTrue(
policy.enforce(
self.context, 'update_port:allowed_address_pairs',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_port:allowed_address_pairs',
self.alt_target)
def test_update_port_with_allowed_address_pairs_and_mac_address(self):
self.assertTrue(
policy.enforce(
self.context, 'update_port:allowed_address_pairs:mac_address',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_port:allowed_address_pairs:mac_address',
self.alt_target)
def test_update_port_with_allowed_address_pairs_and_ip_address(self):
self.assertTrue(
policy.enforce(
self.context, 'update_port:allowed_address_pairs:ip_address',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_port:allowed_address_pairs:ip_address',
self.alt_target)
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):
super().setUp()
self.context = self.project_reader_ctx
def test_create_port(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port', self.alt_target)
def test_create_port_with_device_owner(self): def test_create_port_with_device_owner(self):
target = self.target.copy() target = self.target.copy()
target['device_owner'] = 'network:test' target['device_owner'] = 'network:test'
@@ -1256,16 +1458,6 @@ class ProjectMemberTests(ProjectManagerTests):
policy.enforce, self.context, 'create_port:fixed_ips', policy.enforce, self.context, 'create_port:fixed_ips',
self.alt_target) self.alt_target)
def test_create_port_with_fixed_ips_and_ip_address(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:fixed_ips:ip_address',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:fixed_ips:ip_address',
self.alt_target)
def test_create_port_with_fixed_ips_and_subnet_id(self): def test_create_port_with_fixed_ips_and_subnet_id(self):
self.assertRaises( self.assertRaises(
base_policy.PolicyNotAuthorized, base_policy.PolicyNotAuthorized,
@@ -1322,6 +1514,42 @@ class ProjectMemberTests(ProjectManagerTests):
self.context, 'create_port:allowed_address_pairs:ip_address', self.context, 'create_port:allowed_address_pairs:ip_address',
self.alt_target) self.alt_target)
def test_create_port_with_fixed_ips_and_ip_address(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:fixed_ips:ip_address',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:fixed_ips:ip_address',
self.alt_target)
def test_create_port_with_binding_vnic_type(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:binding:vnic_type',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:binding:vnic_type',
self.alt_target)
def test_create_port_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:tags', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:tags', self.alt_target)
def test_update_port(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port', self.alt_target)
def test_update_port_with_device_owner(self): def test_update_port_with_device_owner(self):
target = self.target.copy() target = self.target.copy()
target['device_owner'] = 'network:test' target['device_owner'] = 'network:test'
@@ -1336,46 +1564,6 @@ class ProjectMemberTests(ProjectManagerTests):
policy.enforce, self.context, 'update_port:device_owner', policy.enforce, self.context, 'update_port:device_owner',
alt_target) alt_target)
def test_update_port_with_mac_address(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:mac_address',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:mac_address',
self.alt_target)
def test_update_port_with_fixed_ips(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:fixed_ips',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:fixed_ips',
self.alt_target)
def test_update_port_with_fixed_ips_and_ip_address(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:fixed_ips:ip_address',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:fixed_ips:ip_address',
self.alt_target)
def test_update_port_with_fixed_ips_and_subnet_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:fixed_ips:subnet_id',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:fixed_ips:subnet_id',
self.alt_target)
def test_update_port_with_port_security_enabled(self): def test_update_port_with_port_security_enabled(self):
self.assertRaises( self.assertRaises(
base_policy.PolicyNotAuthorized, base_policy.PolicyNotAuthorized,
@@ -1422,46 +1610,41 @@ class ProjectMemberTests(ProjectManagerTests):
self.context, 'update_port:allowed_address_pairs:ip_address', self.context, 'update_port:allowed_address_pairs:ip_address',
self.alt_target) self.alt_target)
def test_update_port_with_fixed_ips(self):
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):
super().setUp()
self.context = self.project_reader_ctx
def test_create_port(self):
self.assertRaises( self.assertRaises(
base_policy.PolicyNotAuthorized, base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port', self.target) policy.enforce,
self.assertRaises( self.context, 'update_port:fixed_ips',
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port', self.alt_target)
def test_create_port_with_binding_vnic_type(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:binding:vnic_type',
self.target) self.target)
self.assertRaises( self.assertRaises(
base_policy.PolicyNotAuthorized, base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:binding:vnic_type', policy.enforce,
self.context, 'update_port:fixed_ips',
self.alt_target) self.alt_target)
def test_create_port_tags(self): def test_update_port_with_fixed_ips_and_ip_address(self):
self.assertRaises( self.assertRaises(
base_policy.PolicyNotAuthorized, base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:tags', self.target) policy.enforce,
self.context, 'update_port:fixed_ips:ip_address',
self.target)
self.assertRaises( self.assertRaises(
base_policy.PolicyNotAuthorized, base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:tags', self.alt_target) policy.enforce,
self.context, 'update_port:fixed_ips:ip_address',
self.alt_target)
def test_update_port(self): def test_update_port_with_fixed_ips_and_subnet_id(self):
self.assertRaises( self.assertRaises(
base_policy.PolicyNotAuthorized, base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port', self.target) policy.enforce,
self.context, 'update_port:fixed_ips:subnet_id',
self.target)
self.assertRaises( self.assertRaises(
base_policy.PolicyNotAuthorized, base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port', self.alt_target) policy.enforce,
self.context, 'update_port:fixed_ips:subnet_id',
self.alt_target)
def test_update_port_with_binding_vnic_type(self): def test_update_port_with_binding_vnic_type(self):
self.assertRaises( self.assertRaises(