Merge "[S-RBAC] Change policies for port's binding:profile field"

This commit is contained in:
Zuul 2024-02-20 21:35:59 +00:00 committed by Gerrit Code Review
commit f99ee353e4
11 changed files with 112 additions and 96 deletions

View File

@ -0,0 +1,4 @@
---
# Those are API policy rule overwritten to use in the fullstack tests only
"create_port:binding:profile": "rule:admin_only or rule:service_api"
"update_port:binding:profile": "rule:admin_only or rule:service_api"

View File

@ -66,7 +66,9 @@ rules = [
policy.DocumentedRuleDefault(
name='create_port',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
check_str=neutron_policy.policy_or(
base.ADMIN_OR_PROJECT_MEMBER,
base.SERVICE),
scope_types=['project'],
description='Create a port',
operations=ACTION_POST,
@ -184,7 +186,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='create_port:binding:host_id',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_SERVICE,
scope_types=['project'],
description=(
'Specify ``binding:host_id`` '
@ -199,7 +201,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='create_port:binding:profile',
check_str=base.ADMIN,
check_str=base.SERVICE,
scope_types=['project'],
description=(
'Specify ``binding:profile`` attribute '
@ -214,7 +216,9 @@ rules = [
),
policy.DocumentedRuleDefault(
name='create_port:binding:vnic_type',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
check_str=neutron_policy.policy_or(
base.ADMIN_OR_PROJECT_MEMBER,
base.SERVICE),
scope_types=['project'],
description=(
'Specify ``binding:vnic_type`` '
@ -302,7 +306,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='get_port:binding:vif_type',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_SERVICE,
scope_types=['project'],
description='Get ``binding:vif_type`` attribute of a port',
operations=ACTION_GET,
@ -314,7 +318,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='get_port:binding:vif_details',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_SERVICE,
scope_types=['project'],
description='Get ``binding:vif_details`` attribute of a port',
operations=ACTION_GET,
@ -326,7 +330,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='get_port:binding:host_id',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_SERVICE,
scope_types=['project'],
description='Get ``binding:host_id`` attribute of a port',
operations=ACTION_GET,
@ -338,7 +342,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='get_port:binding:profile',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_SERVICE,
scope_types=['project'],
description='Get ``binding:profile`` attribute of a port',
operations=ACTION_GET,
@ -511,7 +515,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='update_port:binding:host_id',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_SERVICE,
scope_types=['project'],
description='Update ``binding:host_id`` attribute of a port',
operations=ACTION_PUT,
@ -523,7 +527,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='update_port:binding:profile',
check_str=base.ADMIN,
check_str=base.SERVICE,
scope_types=['project'],
description='Update ``binding:profile`` attribute of a port',
operations=ACTION_PUT,

View File

@ -164,7 +164,7 @@ class NeutronConfigFixture(ConfigFixture):
return c_helpers.find_sample_file('api-paste.ini')
def _generate_policy_yaml(self):
return c_helpers.find_sample_file('policy.yaml')
return c_helpers.find_sample_file('fullstack_tests_policy.yaml')
def get_host(self):
return self.config['DEFAULT']['host']

View File

@ -109,13 +109,13 @@ class TestPortBinding(base.TestOVNFunctionalBase):
'tenant_id': self._tenant_id})
port_req = self.new_create_request('ports', port_data, self.fmt,
as_admin=True)
as_service=True)
port_res = port_req.get_response(self.api)
p = self.deserialize(self.fmt, port_res)
port_id = p['port']['id']
else:
port_req = self.new_update_request('ports', port_data, port_id,
self.fmt, as_admin=True)
self.fmt, as_service=True)
port_res = port_req.get_response(self.api)
self.deserialize(self.fmt, port_res)
@ -740,7 +740,7 @@ class TestExternalPorts(base.TestOVNFunctionalBase):
ovn_const.PORT_CAP_SWITCHDEV]}}}
port_req = self.new_create_request('ports', port_data, self.fmt,
as_admin=True)
as_service=True)
port_res = port_req.get_response(self.api)
port = self.deserialize(self.fmt, port_res)['port']
@ -789,7 +789,7 @@ class TestExternalPorts(base.TestOVNFunctionalBase):
ovn_const.PORT_CAP_SWITCHDEV]}}}
port_req = self.new_update_request(
'ports', port_upt_data, port['id'], self.fmt,
as_admin=True)
as_service=True)
port_res = port_req.get_response(self.api)
port = self.deserialize(self.fmt, port_res)['port']

View File

@ -87,7 +87,7 @@ class PortBindingsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
def _test_create_port_binding_profile(self, profile):
profile_arg = {portbindings.PROFILE: profile}
with self.port(is_admin=True,
with self.port(is_service=True,
arg_list=(portbindings.PROFILE,),
**profile_arg) as port:
port_id = port['port']['id']
@ -107,7 +107,7 @@ class PortBindingsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
self._check_port_binding_profile(port['port'])
port_id = port['port']['id']
port = self._update('ports', port_id, {'port': profile_arg},
as_admin=True)['port']
as_service=True)['port']
self._check_port_binding_profile(port, profile)
port = self._show('ports', port_id, as_admin=True)['port']
self._check_port_binding_profile(port, profile)

View File

@ -496,12 +496,14 @@ class AdminTests(PortAPITestCase):
'create_port:binding:host_id', self.alt_target))
def test_create_port_with_binding_profile(self):
self.assertTrue(
policy.enforce(self.context,
'create_port:binding:profile', self.target))
self.assertTrue(
policy.enforce(self.context,
'create_port:binding:profile', self.alt_target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:binding:profile',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:binding:profile',
self.alt_target)
def test_create_port_with_binding_vnic_type(self):
self.assertTrue(
@ -679,12 +681,14 @@ class AdminTests(PortAPITestCase):
'update_port:binding:host_id', self.alt_target))
def test_update_port_with_binding_profile(self):
self.assertTrue(
policy.enforce(self.context,
'update_port:binding:profile', self.target))
self.assertTrue(
policy.enforce(self.context,
'update_port:binding:profile', self.alt_target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:binding:profile',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:binding:profile',
self.alt_target)
def test_update_port_with_binding_vnic_type(self):
self.assertTrue(
@ -1215,9 +1219,8 @@ class ServiceRoleTests(PortAPITestCase):
self.context = self.service_ctx
def test_create_port(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port', self.target)
self.assertTrue(
policy.enforce(self.context, 'create_port', self.target))
def test_create_port_with_device_owner(self):
self.assertTrue(
@ -1251,22 +1254,19 @@ class ServiceRoleTests(PortAPITestCase):
self.target))
def test_create_port_with_binding_host_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:binding:host_id',
self.target)
self.assertTrue(
policy.enforce(self.context,
'create_port:binding:host_id', self.target))
def test_create_port_with_binding_profile(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:binding:profile',
self.target)
self.assertTrue(
policy.enforce(self.context,
'create_port:binding:profile', self.target))
def test_create_port_with_binding_vnic_type(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:binding:vnic_type',
self.target)
self.assertTrue(
policy.enforce(self.context,
'create_port:binding:vnic_type', self.target))
def test_create_port_with_allowed_address_pairs(self):
self.assertRaises(
@ -1294,28 +1294,24 @@ class ServiceRoleTests(PortAPITestCase):
policy.enforce(self.context, 'get_port', self.target))
def test_get_port_binding_vif_type(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_port:binding:vif_type',
self.target)
self.assertTrue(
policy.enforce(self.context,
'get_port:binding:vif_type', self.target))
def test_get_port_binding_vif_details(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_port:binding:vif_details',
self.target)
self.assertTrue(
policy.enforce(self.context,
'get_port:binding:vif_details', self.target))
def test_get_port_binding_host_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_port:binding:host_id',
self.target)
self.assertTrue(
policy.enforce(self.context,
'get_port:binding:host_id', self.target))
def test_get_port_binding_profile(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_port:binding:profile',
self.target)
self.assertTrue(
policy.enforce(self.context,
'get_port:binding:profile', self.target))
def test_get_port_resource_request(self):
self.assertRaises(
@ -1359,16 +1355,14 @@ class ServiceRoleTests(PortAPITestCase):
self.target))
def test_update_port_with_binding_host_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:binding:host_id',
self.target)
self.assertTrue(
policy.enforce(self.context,
'update_port:binding:host_id', self.target))
def test_update_port_with_binding_profile(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:binding:profile',
self.target)
self.assertTrue(
policy.enforce(self.context,
'update_port:binding:profile', self.target))
def test_update_port_with_binding_vnic_type(self):
self.assertTrue(

View File

@ -262,11 +262,12 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
def _service_req(self, method, resource, data=None, fmt=None, id=None,
params=None, action=None, subresource=None, sub_id=None,
ctx=None, headers=None):
ctx=None, headers=None, tenant_id=None):
tenant_id = tenant_id or 'service-project'
req = self._req(method, resource, data, fmt, id, params, action,
subresource, sub_id, ctx, headers)
req.environ['neutron.context'] = context.Context(
'service-user', 'service-project', roles=['service'])
'service-user', tenant_id, roles=['service'])
return req
def _member_req(self, method, resource, data=None, fmt=None, id=None,
@ -291,12 +292,16 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
def new_create_request(self, resource, data, fmt=None, id=None,
subresource=None, context=None, tenant_id=None,
as_admin=False):
as_admin=False, as_service=False):
tenant_id = tenant_id or self._tenant_id
if as_admin:
return self._admin_req(
'POST', resource, data, fmt, id=id,
subresource=subresource, ctx=context, tenant_id=tenant_id)
elif as_service:
return self._service_req(
'POST', resource, data, fmt, id=id,
subresource=subresource, ctx=context, tenant_id=tenant_id)
return self._member_req('POST', resource, data, fmt, id=id,
subresource=subresource, ctx=context,
tenant_id=tenant_id)
@ -345,7 +350,8 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
def new_update_request(self, resource, data, id, fmt=None,
subresource=None, context=None, sub_id=None,
headers=None, as_admin=False, tenant_id=None):
headers=None, as_admin=False, as_service=False,
tenant_id=None):
tenant_id = tenant_id or self._tenant_id
if as_admin:
return self._admin_req(
@ -353,6 +359,10 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
sub_id=sub_id, ctx=context, headers=headers,
tenant_id=tenant_id
)
elif as_service:
return self._service_req(
'PUT', resource, data, fmt, id=id,
subresource=subresource, ctx=context, tenant_id=tenant_id)
return self._member_req(
'PUT', resource, data, fmt, id=id, subresource=subresource,
sub_id=sub_id, ctx=context, headers=headers, tenant_id=tenant_id
@ -407,7 +417,8 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
return req.get_response(self.api)
def _create_bulk(self, fmt, number, resource, data, name='test',
tenant_id=None, as_admin=False, **kwargs):
tenant_id=None, as_admin=False, as_service=False,
**kwargs):
"""Creates a bulk request for any kind of resource."""
tenant_id = tenant_id or self._tenant_id
objects = []
@ -421,7 +432,8 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
req_data = {collection: objects}
req = self.new_create_request(collection, req_data, fmt,
tenant_id=tenant_id,
as_admin=as_admin)
as_admin=as_admin,
as_service=as_service)
return req.get_response(self.api)
def _create_network(self, fmt, name, admin_state_up,
@ -517,7 +529,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
return subnetpool_res
def _create_port(self, fmt, net_id, expected_res_status=None,
arg_list=None, is_admin=False,
arg_list=None, is_admin=False, is_service=False,
tenant_id=None, **kwargs):
tenant_id = tenant_id or self._tenant_id
data = {'port': {'network_id': net_id,
@ -541,7 +553,8 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
data['port']['device_id'] = device_id
port_req = self.new_create_request('ports', data, fmt,
tenant_id=tenant_id,
as_admin=is_admin)
as_admin=is_admin,
as_service=is_service)
port_res = port_req.get_response(self.api)
if expected_res_status:
@ -564,12 +577,12 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
def _create_port_bulk(self, fmt, number, net_id, name,
admin_state_up, tenant_id=None, as_admin=False,
**kwargs):
as_service=False, **kwargs):
base_data = {'port': {'network_id': net_id,
'admin_state_up': admin_state_up}}
return self._create_bulk(fmt, number, 'port', base_data,
tenant_id=tenant_id, as_admin=as_admin,
**kwargs)
as_service=as_service, **kwargs)
def _make_network(self, fmt, name, admin_state_up, as_admin=False,
**kwargs):
@ -732,10 +745,11 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
def _update(self, resource, id, new_data,
expected_code=webob.exc.HTTPOk.code, headers=None,
request_tenant_id=None, as_admin=False):
request_tenant_id=None, as_admin=False, as_service=False):
req = self.new_update_request(
resource, new_data, id, headers=headers,
tenant_id=request_tenant_id, as_admin=as_admin)
tenant_id=request_tenant_id, as_admin=as_admin,
as_service=as_service)
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(expected_code, res.status_int)
return self.deserialize(self.fmt, res)

View File

@ -4449,7 +4449,7 @@ class TestOVNParentTagPortBinding(OVNMechanismDriverTestCase):
self._create_port(
self.fmt, n['network']['id'],
expected_res_status=404,
is_admin=True,
is_service=True,
arg_list=(OVN_PROFILE,),
**binding)
@ -4461,7 +4461,7 @@ class TestOVNParentTagPortBinding(OVNMechanismDriverTestCase):
with self.port(s) as p:
binding[OVN_PROFILE]['parent_name'] = p['port']['id']
res = self._create_port(self.fmt, n['network']['id'],
is_admin=True,
is_service=True,
arg_list=(OVN_PROFILE,),
**binding)
port = self.deserialize(self.fmt, res)
@ -4476,7 +4476,7 @@ class TestOVNParentTagPortBinding(OVNMechanismDriverTestCase):
with self.port(s) as p:
binding[OVN_PROFILE]['parent_name'] = p['port']['id']
self._create_port(self.fmt, n['network']['id'],
is_admin=True,
is_service=True,
arg_list=(OVN_PROFILE,),
expected_res_status=400,
**binding)
@ -4490,7 +4490,7 @@ class TestOVNVtepPortBinding(OVNMechanismDriverTestCase):
with self.network() as n:
with self.subnet(n):
res = self._create_port(self.fmt, n['network']['id'],
is_admin=True,
is_service=True,
arg_list=(OVN_PROFILE,),
**binding)
port = self.deserialize(self.fmt, res)
@ -4502,7 +4502,7 @@ class TestOVNVtepPortBinding(OVNMechanismDriverTestCase):
with self.network() as n:
with self.subnet(n):
self._create_port(self.fmt, n['network']['id'],
is_admin=True,
is_service=True,
arg_list=(OVN_PROFILE,),
expected_res_status=400,
**binding)
@ -4512,7 +4512,7 @@ class TestOVNVtepPortBinding(OVNMechanismDriverTestCase):
with self.network() as n:
with self.subnet(n):
self._create_port(self.fmt, n['network']['id'],
is_admin=True,
is_service=True,
arg_list=(OVN_PROFILE,),
expected_res_status=400,
**binding)
@ -4523,7 +4523,7 @@ class TestOVNVtepPortBinding(OVNMechanismDriverTestCase):
with self.network() as n:
with self.subnet(n):
self._create_port(self.fmt, n['network']['id'],
is_admin=True,
is_service=True,
arg_list=(OVN_PROFILE,),
expected_res_status=400,
**binding)
@ -4535,7 +4535,7 @@ class TestOVNVtepPortBinding(OVNMechanismDriverTestCase):
with self.network() as n:
with self.subnet(n):
self._create_port(self.fmt, n['network']['id'],
is_admin=True,
is_service=True,
arg_list=(OVN_PROFILE,),
expected_res_status=404,
**binding)

View File

@ -1522,7 +1522,7 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
portbindings.VNIC_TYPE: 'macvtap',
portbindings.PROFILE: {'bar': 'bar'}}}
res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
'test', True, as_admin=True,
'test', True, as_service=True,
override=overrides)
ports = self.deserialize(self.fmt, res)['ports']
self.assertCountEqual(['direct', 'macvtap'],
@ -2577,7 +2577,7 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
profile_arg = {portbindings.PROFILE: {'d': s}}
try:
with self.port(expected_res_status=400,
is_admin=True,
is_service=True,
arg_list=(portbindings.PROFILE,),
**profile_arg):
pass
@ -2587,7 +2587,7 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
def test_remove_port_binding_profile(self):
profile = {'e': 5}
profile_arg = {portbindings.PROFILE: profile}
with self.port(is_admin=True,
with self.port(is_service=True,
arg_list=(portbindings.PROFILE,),
**profile_arg) as port:
self._check_port_binding_profile(port['port'], profile)
@ -2595,7 +2595,7 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
profile_arg = {portbindings.PROFILE: None}
port = self._update('ports', port_id,
{'port': profile_arg},
as_admin=True)['port']
as_service=True)['port']
self._check_port_binding_profile(port)
port = self._show('ports', port_id, as_admin=True)['port']
self._check_port_binding_profile(port)
@ -2790,7 +2790,7 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
def test_port_binding_profile_not_changed(self):
profile = {'e': 5}
profile_arg = {portbindings.PROFILE: profile}
with self.port(is_admin=True,
with self.port(is_service=True,
arg_list=(portbindings.PROFILE,),
**profile_arg) as port:
self._check_port_binding_profile(port['port'], profile)

View File

@ -716,7 +716,7 @@ class ExtendedPortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
mechanism_test.TestMechanismDriver, '_check_port_context'
):
req = self.new_update_request('ports', update_body, port_id,
as_admin=True)
as_service=True)
self.assertEqual(200, req.get_response(self.api).status_int)
def test_bind_non_pf_port_with_mac_port_not_updated(self):
@ -857,7 +857,7 @@ class ExtendedPortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
mechanism_test.TestMechanismDriver, '_check_port_context'
):
req = self.new_update_request('ports', update_body, port['id'],
as_admin=True)
as_service=True)
self.assertEqual(200, req.get_response(self.api).status_int)
# Neutron expected to reset the MAC to a generated one so that the

View File

@ -1379,7 +1379,7 @@ class L3DvrSchedulerTestCase(L3SchedulerBaseMixin,
subnet_ids = []
subnet_ids.append(subnet['subnet']['id'])
with self.port(subnet=subnet,
is_admin=True,
is_service=True,
device_owner=DEVICE_OWNER_COMPUTE,
arg_list=('admin_state_up',
portbindings.PROFILE,), **host_args):