[S-RBAC] Add API policies for get and activate port bindings

There wasn't policies for get port binding and activate port binding API
calls defined at all.
When we switched to new default policies and regular user wanted to make
call to activate port binding, it was error 500 what we returned instead
of proper 4xx error. It was like that as "get_port_binding" call which
was done internally during "activate" API request falled back to the
default policy which is "admin_or_owner" and as port binding resource
don't have project_id, owner couldn't be checked there.

Now it has defined S-RBAC policies for those API calls and it is allowed
for admin users only to solve that problem.
This patch don't define old, deprecated policies for those API calls as
it wasn't really needed there and we already switched to new policies by
default now.

Closes-Bug: #2013326
Change-Id: Id281e4950dc5d7bac62dfa8175d82cb1f8d2e855
This commit is contained in:
Slawek Kaplonski 2023-05-29 14:28:46 +02:00
parent 8fe532d7ea
commit 61b358b6b5
3 changed files with 153 additions and 0 deletions

View File

@ -34,6 +34,7 @@ from neutron.conf.policies import network
from neutron.conf.policies import network_ip_availability
from neutron.conf.policies import network_segment_range
from neutron.conf.policies import port
from neutron.conf.policies import port_bindings
from neutron.conf.policies import qos
from neutron.conf.policies import quotas
from neutron.conf.policies import rbac
@ -67,6 +68,7 @@ def list_rules():
network.list_rules(),
network_ip_availability.list_rules(),
network_segment_range.list_rules(),
port_bindings.list_rules(),
port.list_rules(),
qos.list_rules(),
quotas.list_rules(),

View File

@ -0,0 +1,51 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_policy import policy
from neutron.conf.policies import base
GET_BINDING_PATH = '/ports/{port_id}/bindings/'
ACTIVATE_BINDING_PATH = '/ports/{port_id}/bindings/{host}'
rules = [
policy.DocumentedRuleDefault(
name='get_port_binding',
check_str=base.ADMIN,
scope_types=['project'],
description='Get port binding information',
operations=[
{
'method': 'GET',
'path': GET_BINDING_PATH,
},
],
),
policy.DocumentedRuleDefault(
name='activate',
check_str=base.ADMIN,
scope_types=['project'],
description='Activate port binding on the host',
operations=[
{
'method': 'GET',
'path': ACTIVATE_BINDING_PATH,
},
],
),
]
def list_rules():
return rules

View File

@ -0,0 +1,100 @@
# Copyright (c) 2021 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_policy import policy as base_policy
from neutron import policy
from neutron.tests.unit.conf.policies import test_base as base
class PortBindingsAPITestCase(base.PolicyBaseTestCase):
def setUp(self):
super(PortBindingsAPITestCase, self).setUp()
self.target = {}
class SystemAdminTests(PortBindingsAPITestCase):
def setUp(self):
super(SystemAdminTests, self).setUp()
self.context = self.system_admin_ctx
def test_get_port_binding(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, "get_port_binding", self.target)
def test_activate_port_binding(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, "activate", self.target)
class SystemMemberTests(SystemAdminTests):
def setUp(self):
super(SystemMemberTests, self).setUp()
self.context = self.system_member_ctx
class SystemReaderTests(SystemMemberTests):
def setUp(self):
super(SystemReaderTests, self).setUp()
self.context = self.system_reader_ctx
class AdminTests(PortBindingsAPITestCase):
def setUp(self):
super(AdminTests, self).setUp()
self.context = self.project_admin_ctx
def test_get_port_binding(self):
self.assertTrue(
policy.enforce(self.context, "get_port_binding", self.target))
def test_activate_port_binding(self):
self.assertTrue(
policy.enforce(self.context, "activate", self.target))
class ProjectMemberTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
def test_get_port_binding(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "get_port_binding", self.target)
def test_activate_port_binding(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "activate", self.target)
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):
super(ProjectReaderTests, self).setUp()
self.context = self.project_reader_ctx