diff --git a/neutron/conf/policies/__init__.py b/neutron/conf/policies/__init__.py index 5fe2153b18b..69cd64b3ef8 100644 --- a/neutron/conf/policies/__init__.py +++ b/neutron/conf/policies/__init__.py @@ -25,6 +25,8 @@ from neutron.conf.policies import floatingip from neutron.conf.policies import floatingip_pools from neutron.conf.policies import floatingip_port_forwarding from neutron.conf.policies import l3_conntrack_helper +from neutron.conf.policies import local_ip +from neutron.conf.policies import local_ip_association from neutron.conf.policies import logging from neutron.conf.policies import metering from neutron.conf.policies import network @@ -56,6 +58,8 @@ def list_rules(): floatingip_pools.list_rules(), floatingip_port_forwarding.list_rules(), l3_conntrack_helper.list_rules(), + local_ip.list_rules(), + local_ip_association.list_rules(), logging.list_rules(), metering.list_rules(), network.list_rules(), diff --git a/neutron/conf/policies/local_ip.py b/neutron/conf/policies/local_ip.py new file mode 100644 index 00000000000..47faa01a9cc --- /dev/null +++ b/neutron/conf/policies/local_ip.py @@ -0,0 +1,102 @@ +# Copyright 2021 Huawei, Inc. +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import versionutils +from oslo_policy import policy + +from neutron.conf.policies import base + +COLLECTION_PATH = '/local-ips' +RESOURCE_PATH = '/local-ips/{id}' + +DEPRECATION_REASON = ( + "The Local IP API now supports system scope and default roles.") + +rules = [ + policy.DocumentedRuleDefault( + name='create_local_ip', + check_str=base.PROJECT_MEMBER, + description='Create a Local IP', + operations=[ + { + 'method': 'POST', + 'path': COLLECTION_PATH, + }, + ], + scope_types=['project'], + deprecated_rule=policy.DeprecatedRule( + name='create_local_ip', + check_str=base.RULE_ANY, + deprecated_reason=DEPRECATION_REASON, + deprecated_since=versionutils.deprecated.WALLABY) + ), + policy.DocumentedRuleDefault( + name='get_local_ip', + check_str=base.PROJECT_READER, + description='Get a Local IP', + operations=[ + { + 'method': 'GET', + 'path': COLLECTION_PATH, + }, + { + 'method': 'GET', + 'path': RESOURCE_PATH, + }, + ], + scope_types=['project'], + deprecated_rule=policy.DeprecatedRule( + name='get_local_ip', + check_str=base.RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATION_REASON, + deprecated_since=versionutils.deprecated.WALLABY) + ), + policy.DocumentedRuleDefault( + name='update_local_ip', + check_str=base.PROJECT_MEMBER, + description='Update a Local IP', + operations=[ + { + 'method': 'PUT', + 'path': RESOURCE_PATH, + }, + ], + scope_types=['project'], + deprecated_rule=policy.DeprecatedRule( + name='update_local_ip', + check_str=base.RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATION_REASON, + deprecated_since=versionutils.deprecated.WALLABY) + ), + policy.DocumentedRuleDefault( + name='delete_local_ip', + check_str=base.PROJECT_MEMBER, + description='Delete a Local IP', + operations=[ + { + 'method': 'DELETE', + 'path': RESOURCE_PATH, + }, + ], + scope_types=['project'], + deprecated_rule=policy.DeprecatedRule( + name='delete_local_ip', + check_str=base.RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATION_REASON, + deprecated_since=versionutils.deprecated.WALLABY) + ), +] + + +def list_rules(): + return rules diff --git a/neutron/conf/policies/local_ip_association.py b/neutron/conf/policies/local_ip_association.py new file mode 100644 index 00000000000..9b2f6060735 --- /dev/null +++ b/neutron/conf/policies/local_ip_association.py @@ -0,0 +1,92 @@ +# Copyright 2021 Huawei, Inc. +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import versionutils +from oslo_policy import policy + +from neutron.conf.policies import base + +COLLECTION_PATH = '/local_ips/{local_ip_id}/port_associations' +RESOURCE_PATH = ('/local_ips/{local_ip_id}' + '/port_associations/{fixed_port_id}') + +DEPRECATION_REASON = ( + "The Local IP API now supports system scope and default roles.") + +rules = [ + policy.DocumentedRuleDefault( + name='create_local_ip_port_association', + check_str=base.policy_or( + base.PROJECT_MEMBER, + base.RULE_PARENT_OWNER), + scope_types=['project'], + description='Create a Local IP port association', + operations=[ + { + 'method': 'POST', + 'path': COLLECTION_PATH, + }, + ], + deprecated_rule=policy.DeprecatedRule( + name='create_local_ip_port_association', + check_str=base.RULE_ADMIN_OR_PARENT_OWNER, + deprecated_reason=DEPRECATION_REASON, + deprecated_since=versionutils.deprecated.WALLABY) + ), + policy.DocumentedRuleDefault( + name='get_local_ip_port_association', + check_str=base.policy_or( + base.PROJECT_READER, + base.RULE_PARENT_OWNER), + scope_types=['project'], + description='Get a Local IP port association', + operations=[ + { + 'method': 'GET', + 'path': COLLECTION_PATH, + }, + { + 'method': 'GET', + 'path': RESOURCE_PATH, + }, + ], + deprecated_rule=policy.DeprecatedRule( + name='get_local_ip_port_association', + check_str=base.RULE_ADMIN_OR_PARENT_OWNER, + deprecated_reason=DEPRECATION_REASON, + deprecated_since=versionutils.deprecated.WALLABY) + ), + policy.DocumentedRuleDefault( + name='delete_local_ip_port_association', + check_str=base.policy_or( + base.PROJECT_MEMBER, + base.RULE_PARENT_OWNER), + scope_types=['project'], + description='Delete a Local IP port association', + operations=[ + { + 'method': 'DELETE', + 'path': RESOURCE_PATH, + }, + ], + deprecated_rule=policy.DeprecatedRule( + name='delete_local_ip_port_association', + check_str=base.RULE_ADMIN_OR_PARENT_OWNER, + deprecated_reason=DEPRECATION_REASON, + deprecated_since=versionutils.deprecated.WALLABY) + ), +] + + +def list_rules(): + return rules diff --git a/neutron/tests/unit/conf/policies/test_local_ip.py b/neutron/tests/unit/conf/policies/test_local_ip.py new file mode 100644 index 00000000000..3d4585e3754 --- /dev/null +++ b/neutron/tests/unit/conf/policies/test_local_ip.py @@ -0,0 +1,168 @@ +# Copyright 2021 Huawei, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_policy import policy as base_policy + +from neutron import policy +from neutron.tests.unit.conf.policies import base + + +class LocalIPAPITestCase(base.PolicyBaseTestCase): + + def setUp(self): + super(LocalIPAPITestCase, self).setUp() + self.target = { + 'project_id': self.project_id, + 'local_ip_address': '172.24.4.228'} + self.alt_target = { + 'project_id': self.alt_project_id, + 'local_ip_address': '172.24.4.229'} + + +class SystemAdminTests(LocalIPAPITestCase): + + def setUp(self): + super(SystemAdminTests, self).setUp() + self.context = self.system_admin_ctx + + def test_create_local_ip(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, "create_local_ip", self.target) + + def test_get_local_ip(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, "get_local_ip", self.target) + + def test_update_local_ip(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, "update_local_ip", self.target) + + def test_delete_local_ip(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, "delete_local_ip", self.target) + + +class SystemMemberTests(SystemAdminTests): + + def setUp(self): + super(SystemMemberTests, self).setUp() + self.context = self.system_member_ctx + + +class SystemReaderTests(SystemMemberTests): + + def setUp(self): + super(SystemReaderTests, self).setUp() + self.context = self.system_reader_ctx + + +class ProjectAdminTests(LocalIPAPITestCase): + + def setUp(self): + super(ProjectAdminTests, self).setUp() + self.context = self.project_admin_ctx + + def test_create_local_ip(self): + self.assertTrue( + policy.enforce(self.context, "create_local_ip", self.target)) + + def test_create_local_ip_other_project(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, "create_local_ip", self.alt_target) + + def test_get_local_ip(self): + self.assertTrue( + policy.enforce(self.context, "get_local_ip", self.target)) + + def test_get_local_ip_other_project(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, "get_local_ip", self.alt_target) + + def test_update_local_ip(self): + self.assertTrue( + policy.enforce(self.context, "update_local_ip", self.target)) + + def test_update_local_ip_other_project(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, "update_local_ip", self.alt_target) + + def test_delete_local_ip(self): + self.assertTrue( + policy.enforce(self.context, "delete_local_ip", self.target)) + + def test_delete_local_ip_other_project(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, "delete_local_ip", self.alt_target) + + +class ProjectMemberTests(ProjectAdminTests): + + def setUp(self): + super(ProjectMemberTests, self).setUp() + self.context = self.project_member_ctx + + +class ProjectReaderTests(LocalIPAPITestCase): + + def setUp(self): + super(ProjectReaderTests, self).setUp() + self.context = self.project_reader_ctx + + def test_create_localip(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, "create_local_ip", self.target) + + def test_create_localip_other_project(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, "create_local_ip", self.alt_target) + + def test_get_local_ip(self): + self.assertTrue( + policy.enforce(self.context, "get_local_ip", self.target)) + + def test_get_local_ip_other_project(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, "get_local_ip", self.alt_target) + + def test_update_local_ip(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, "update_local_ip", self.target) + + def test_update_local_ip_other_project(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, "update_local_ip", self.alt_target) + + def test_delete_local_ip(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, "delete_local_ip", self.target) + + def test_delete_local_ip_other_project(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, "delete_local_ip", self.alt_target) diff --git a/neutron/tests/unit/conf/policies/test_local_ip_association.py b/neutron/tests/unit/conf/policies/test_local_ip_association.py new file mode 100644 index 00000000000..3162862b205 --- /dev/null +++ b/neutron/tests/unit/conf/policies/test_local_ip_association.py @@ -0,0 +1,181 @@ +# Copyright 2021 Huawei, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock + +from oslo_policy import policy as base_policy +from oslo_utils import uuidutils + +from neutron import policy +from neutron.tests.unit.conf.policies import base + + +class LocalIPAssociationAPITestCase(base.PolicyBaseTestCase): + + def setUp(self): + super(LocalIPAssociationAPITestCase, self).setUp() + self.local_ip = { + 'id': uuidutils.generate_uuid(), + 'project_id': self.project_id} + + self.target = { + 'project_id': self.project_id, + 'local_ip_id': self.local_ip['id'], + 'ext_parent_local_ip_id': self.local_ip['id']} + self.alt_target = { + 'project_id': self.alt_project_id, + 'local_ip_id': self.local_ip['id'], + 'ext_parent_local_ip_id': self.local_ip['id']} + + self.plugin_mock = mock.Mock() + self.plugin_mock.get_local_ip.return_value = self.local_ip + mock.patch( + 'neutron_lib.plugins.directory.get_plugin', + return_value=self.plugin_mock).start() + + +class SystemAdminTests(LocalIPAssociationAPITestCase): + + def setUp(self): + super(SystemAdminTests, self).setUp() + self.context = self.system_admin_ctx + + def test_create_local_ip_port_association(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'create_local_ip_port_association', + self.target) + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'create_local_ip_port_association', + self.alt_target) + + def test_get_local_ip_port_association(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'get_local_ip_port_association', + self.target) + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'get_local_ip_port_association', + self.alt_target) + + def test_delete_local_ip_port_association(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'delete_local_ip_port_association', + self.target) + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, + self.context, 'delete_local_ip_port_association', + self.alt_target) + + +class SystemMemberTests(SystemAdminTests): + + def setUp(self): + super(SystemMemberTests, self).setUp() + self.context = self.system_member_ctx + + +class SystemReaderTests(SystemMemberTests): + + def setUp(self): + super(SystemReaderTests, self).setUp() + self.context = self.system_reader_ctx + + +class ProjectAdminTests(LocalIPAssociationAPITestCase): + + def setUp(self): + super(ProjectAdminTests, self).setUp() + self.context = self.project_admin_ctx + + def test_create_local_ip_port_association(self): + self.assertTrue( + policy.enforce(self.context, + 'create_local_ip_port_association', + self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'create_local_ip_port_association', + self.alt_target) + + def test_get_local_ip_port_association(self): + self.assertTrue( + policy.enforce(self.context, + 'get_local_ip_port_association', + self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'get_local_ip_port_association', + self.alt_target) + + def test_delete_local_ip_port_association(self): + self.assertTrue( + policy.enforce(self.context, + 'delete_local_ip_port_association', + self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'delete_local_ip_port_association', + self.alt_target) + + +class ProjectMemberTests(ProjectAdminTests): + + def setUp(self): + super(ProjectMemberTests, self).setUp() + self.context = self.project_member_ctx + + +class ProjectReaderTests(ProjectMemberTests): + + def setUp(self): + super(ProjectReaderTests, self).setUp() + self.context = self.project_reader_ctx + + def test_create_local_ip_port_association(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'create_local_ip_port_association', + self.target) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'create_local_ip_port_association', + self.alt_target) + + def test_delete_local_ip_port_association(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'delete_local_ip_port_association', + self.target) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'delete_local_ip_port_association', + self.alt_target) diff --git a/neutron/tests/unit/extensions/test_local_ip.py b/neutron/tests/unit/extensions/test_local_ip.py index 4fd237dbf06..5de762d8b15 100644 --- a/neutron/tests/unit/extensions/test_local_ip.py +++ b/neutron/tests/unit/extensions/test_local_ip.py @@ -48,7 +48,7 @@ class LocalIPTestBase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase): req = self.new_create_request('local-ips', local_ip) neutron_context = context.Context( - '', kwargs.get('project_id', self._tenant_id)) + '', kwargs.get('project_id', self._tenant_id), is_admin=True) req.environ['neutron.context'] = neutron_context res = req.get_response(self.ext_api) if res.status_int >= webob.exc.HTTPClientError.code: