diff --git a/nova/tests/unit/api/openstack/compute/test_services.py b/nova/tests/unit/api/openstack/compute/test_services.py index 8bcad222bd44..884a9a31862f 100644 --- a/nova/tests/unit/api/openstack/compute/test_services.py +++ b/nova/tests/unit/api/openstack/compute/test_services.py @@ -1369,43 +1369,3 @@ class ServicesTestV275(test.TestCase): version=self.wsgi_api_version) self.assertRaises(exception.ValidationError, self.controller.index, req) - - -class ServicesPolicyEnforcementV21(test.NoDBTestCase): - - def setUp(self): - super(ServicesPolicyEnforcementV21, self).setUp() - self.controller = services_v21.ServiceController() - self.req = fakes.HTTPRequest.blank('') - - def test_update_policy_failed(self): - rule_name = "os_compute_api:os-services" - self.policy.set_rules({rule_name: "project_id:non_fake"}) - exc = self.assertRaises( - exception.PolicyNotAuthorized, - self.controller.update, self.req, fakes.FAKE_UUID, - body={'host': 'host1', - 'binary': 'nova-compute'}) - self.assertEqual( - "Policy doesn't allow %s to be performed." % rule_name, - exc.format_message()) - - def test_delete_policy_failed(self): - rule_name = "os_compute_api:os-services" - self.policy.set_rules({rule_name: "project_id:non_fake"}) - exc = self.assertRaises( - exception.PolicyNotAuthorized, - self.controller.delete, self.req, fakes.FAKE_UUID) - self.assertEqual( - "Policy doesn't allow %s to be performed." % rule_name, - exc.format_message()) - - def test_index_policy_failed(self): - rule_name = "os_compute_api:os-services" - self.policy.set_rules({rule_name: "project_id:non_fake"}) - exc = self.assertRaises( - exception.PolicyNotAuthorized, - self.controller.index, self.req) - self.assertEqual( - "Policy doesn't allow %s to be performed." % rule_name, - exc.format_message()) diff --git a/nova/tests/unit/policies/__init__.py b/nova/tests/unit/policies/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/nova/tests/unit/policies/base.py b/nova/tests/unit/policies/base.py new file mode 100644 index 000000000000..bcb7a28aef2a --- /dev/null +++ b/nova/tests/unit/policies/base.py @@ -0,0 +1,113 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log as logging +from oslo_utils.fixture import uuidsentinel as uuids + +from nova import context as nova_context +from nova import exception +from nova import test +from nova.tests.unit import policy_fixture + + +LOG = logging.getLogger(__name__) + + +class BasePolicyTest(test.TestCase): + + def setUp(self): + super(BasePolicyTest, self).setUp() + self.policy = self.useFixture(policy_fixture.RealPolicyFixture()) + + self.admin_project_id = uuids.admin_project_id + self.project_id = uuids.project_id + self.project_id_other = uuids.project_id_other + + # all context are with implied roles. + self.legacy_admin_context = nova_context.RequestContext( + user_id="legacy_admin", project_id=self.admin_project_id, + roles=['admin', 'member', 'reader']) + + # system scoped users + self.system_admin_context = nova_context.RequestContext( + user_id="admin", + roles=['admin', 'member', 'reader'], system_scope='all') + + self.system_member_context = nova_context.RequestContext( + user_id="member", + roles=['member', 'reader'], system_scope='all') + + self.system_reader_context = nova_context.RequestContext( + user_id="reader", roles=['reader'], system_scope='all') + + self.system_foo_context = nova_context.RequestContext( + user_id="foo", roles=['foo'], system_scope='all') + + # project scoped users + self.project_admin_context = nova_context.RequestContext( + user_id="project_admin", project_id=self.project_id, + roles=['admin', 'member', 'reader']) + + self.project_member_context = nova_context.RequestContext( + user_id="project_member", project_id=self.project_id, + roles=['member', 'reader']) + + self.project_reader_context = nova_context.RequestContext( + user_id="project_reader", project_id=self.project_id, + roles=['reader']) + + self.project_foo_context = nova_context.RequestContext( + user_id="project_foo", project_id=self.project_id, + roles=['foo']) + + self.other_project_member_context = nova_context.RequestContext( + user_id="other_project_member", + project_id=self.project_id_other, + roles=['member', 'reader']) + + self.all_contexts = [ + self.legacy_admin_context, self.system_admin_context, + self.system_member_context, self.system_reader_context, + self.system_foo_context, + self.project_admin_context, self.project_member_context, + self.project_reader_context, self.other_project_member_context, + self.project_foo_context, + ] + + def common_policy_check(self, authorized_contexts, + unauthorized_contexts, rule_name, + func, req, *arg, **kwarg): + + self.assertEqual(len(self.all_contexts), + len(authorized_contexts) + len( + unauthorized_contexts), + "Few context are missing. check all contexts " + "mentioned in self.all_contexts are tested") + + def ensure_raises(req): + exc = self.assertRaises( + exception.PolicyNotAuthorized, func, req, *arg, **kwarg) + self.assertEqual( + "Policy doesn't allow %s to be performed." % + rule_name, exc.format_message()) + # Verify all the context having allowed scope and roles pass + # the policy check. + for context in authorized_contexts: + LOG.info("Testing authorized context: %s", context) + req.environ['nova.context'] = context + func(req, *arg, **kwarg) + # Verify all the context not having allowed scope or roles fail + # the policy check. + for context in unauthorized_contexts: + LOG.info("Testing unauthorized context: %s", context) + req.environ['nova.context'] = context + ensure_raises(req) diff --git a/nova/tests/unit/policies/test_services.py b/nova/tests/unit/policies/test_services.py new file mode 100644 index 000000000000..058c7d26a409 --- /dev/null +++ b/nova/tests/unit/policies/test_services.py @@ -0,0 +1,99 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import mock + +from nova.api.openstack.compute import services as services_v21 +from nova.tests.unit.api.openstack import fakes +from nova.tests.unit.policies import base + + +class ServicesPolicyTest(base.BasePolicyTest): + """Test os-services APIs policies with all possible context. + + This class defines the set of context with different roles + which are allowed and not allowed to pass the policy checks. + With those set of context, it will call the API operation and + verify the expected behaviour. + """ + + def setUp(self): + super(ServicesPolicyTest, self).setUp() + self.controller = services_v21.ServiceController() + self.req = fakes.HTTPRequest.blank('/services') + # Check that admin is able to change the service + self.admin_authorized_contexts = [ + self.legacy_admin_context, self.system_admin_context, + self.project_admin_context] + # Check that non-admin is not able to change the service + self.admin_unauthorized_contexts = [ + self.system_member_context, self.system_reader_context, + self.system_foo_context, self.project_member_context, + self.other_project_member_context, + self.project_foo_context, self.project_reader_context + ] + + def test_delete_service_policy(self): + rule_name = "os_compute_api:os-services" + with mock.patch('nova.compute.api.HostAPI.service_get_by_id'): + self.common_policy_check(self.admin_authorized_contexts, + self.admin_unauthorized_contexts, + rule_name, self.controller.delete, + self.req, 1) + + def test_index_service_policy(self): + rule_name = "os_compute_api:os-services" + with mock.patch('nova.compute.api.HostAPI.service_get_all'): + self.common_policy_check(self.admin_authorized_contexts, + self.admin_unauthorized_contexts, + rule_name, self.controller.index, + self.req) + + def test_old_update_service_policy(self): + rule_name = "os_compute_api:os-services" + body = {'host': 'host1', 'binary': 'nova-compute'} + update = 'nova.compute.api.HostAPI.service_update_by_host_and_binary' + with mock.patch(update): + self.common_policy_check(self.admin_authorized_contexts, + self.admin_unauthorized_contexts, + rule_name, self.controller.update, + self.req, 'enable', body=body) + + def test_update_service_policy(self): + rule_name = "os_compute_api:os-services" + req = fakes.HTTPRequest.blank( + '', version=services_v21.UUID_FOR_ID_MIN_VERSION) + service = self.start_service( + 'compute', 'fake-compute-host').service_ref + with mock.patch('nova.compute.api.HostAPI.service_update'): + self.common_policy_check(self.admin_authorized_contexts, + self.admin_unauthorized_contexts, + rule_name, self.controller.update, + req, service.uuid, + body={'status': 'enabled'}) + + +class ServicesScopeTypePolicyTest(ServicesPolicyTest): + """Test os-services APIs policies with system scope enabled. + + This class set the nova.conf [oslo_policy] enforce_scope to True + so that we can switch on the scope checking on oslo policy side. + It defines the set of context with scopped token + which are allowed and not allowed to pass the policy checks. + With those set of context, it will run the API operation and + verify the expected behaviour. + """ + + def setUp(self): + super(ServicesScopeTypePolicyTest, self).setUp() + self.flags(enforce_scope=True, group="oslo_policy")