Merge "Add test coverage of existing os-services policies"

This commit is contained in:
Zuul
2019-11-29 18:28:24 +00:00
committed by Gerrit Code Review
4 changed files with 212 additions and 40 deletions

View File

@@ -1369,43 +1369,3 @@ class ServicesTestV275(test.TestCase):
version=self.wsgi_api_version) version=self.wsgi_api_version)
self.assertRaises(exception.ValidationError, self.assertRaises(exception.ValidationError,
self.controller.index, req) self.controller.index, req)
class ServicesPolicyEnforcementV21(test.NoDBTestCase):
def setUp(self):
super(ServicesPolicyEnforcementV21, self).setUp()
self.controller = services_v21.ServiceController()
self.req = fakes.HTTPRequest.blank('')
def test_update_policy_failed(self):
rule_name = "os_compute_api:os-services"
self.policy.set_rules({rule_name: "project_id:non_fake"})
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller.update, self.req, fakes.FAKE_UUID,
body={'host': 'host1',
'binary': 'nova-compute'})
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
def test_delete_policy_failed(self):
rule_name = "os_compute_api:os-services"
self.policy.set_rules({rule_name: "project_id:non_fake"})
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller.delete, self.req, fakes.FAKE_UUID)
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
def test_index_policy_failed(self):
rule_name = "os_compute_api:os-services"
self.policy.set_rules({rule_name: "project_id:non_fake"})
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller.index, self.req)
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())

View File

View File

@@ -0,0 +1,113 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from oslo_utils.fixture import uuidsentinel as uuids
from nova import context as nova_context
from nova import exception
from nova import test
from nova.tests.unit import policy_fixture
LOG = logging.getLogger(__name__)
class BasePolicyTest(test.TestCase):
def setUp(self):
super(BasePolicyTest, self).setUp()
self.policy = self.useFixture(policy_fixture.RealPolicyFixture())
self.admin_project_id = uuids.admin_project_id
self.project_id = uuids.project_id
self.project_id_other = uuids.project_id_other
# all context are with implied roles.
self.legacy_admin_context = nova_context.RequestContext(
user_id="legacy_admin", project_id=self.admin_project_id,
roles=['admin', 'member', 'reader'])
# system scoped users
self.system_admin_context = nova_context.RequestContext(
user_id="admin",
roles=['admin', 'member', 'reader'], system_scope='all')
self.system_member_context = nova_context.RequestContext(
user_id="member",
roles=['member', 'reader'], system_scope='all')
self.system_reader_context = nova_context.RequestContext(
user_id="reader", roles=['reader'], system_scope='all')
self.system_foo_context = nova_context.RequestContext(
user_id="foo", roles=['foo'], system_scope='all')
# project scoped users
self.project_admin_context = nova_context.RequestContext(
user_id="project_admin", project_id=self.project_id,
roles=['admin', 'member', 'reader'])
self.project_member_context = nova_context.RequestContext(
user_id="project_member", project_id=self.project_id,
roles=['member', 'reader'])
self.project_reader_context = nova_context.RequestContext(
user_id="project_reader", project_id=self.project_id,
roles=['reader'])
self.project_foo_context = nova_context.RequestContext(
user_id="project_foo", project_id=self.project_id,
roles=['foo'])
self.other_project_member_context = nova_context.RequestContext(
user_id="other_project_member",
project_id=self.project_id_other,
roles=['member', 'reader'])
self.all_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.other_project_member_context,
self.project_foo_context,
]
def common_policy_check(self, authorized_contexts,
unauthorized_contexts, rule_name,
func, req, *arg, **kwarg):
self.assertEqual(len(self.all_contexts),
len(authorized_contexts) + len(
unauthorized_contexts),
"Few context are missing. check all contexts "
"mentioned in self.all_contexts are tested")
def ensure_raises(req):
exc = self.assertRaises(
exception.PolicyNotAuthorized, func, req, *arg, **kwarg)
self.assertEqual(
"Policy doesn't allow %s to be performed." %
rule_name, exc.format_message())
# Verify all the context having allowed scope and roles pass
# the policy check.
for context in authorized_contexts:
LOG.info("Testing authorized context: %s", context)
req.environ['nova.context'] = context
func(req, *arg, **kwarg)
# Verify all the context not having allowed scope or roles fail
# the policy check.
for context in unauthorized_contexts:
LOG.info("Testing unauthorized context: %s", context)
req.environ['nova.context'] = context
ensure_raises(req)

View File

@@ -0,0 +1,99 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from nova.api.openstack.compute import services as services_v21
from nova.tests.unit.api.openstack import fakes
from nova.tests.unit.policies import base
class ServicesPolicyTest(base.BasePolicyTest):
"""Test os-services APIs policies with all possible context.
This class defines the set of context with different roles
which are allowed and not allowed to pass the policy checks.
With those set of context, it will call the API operation and
verify the expected behaviour.
"""
def setUp(self):
super(ServicesPolicyTest, self).setUp()
self.controller = services_v21.ServiceController()
self.req = fakes.HTTPRequest.blank('/services')
# Check that admin is able to change the service
self.admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context]
# Check that non-admin is not able to change the service
self.admin_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.other_project_member_context,
self.project_foo_context, self.project_reader_context
]
def test_delete_service_policy(self):
rule_name = "os_compute_api:os-services"
with mock.patch('nova.compute.api.HostAPI.service_get_by_id'):
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller.delete,
self.req, 1)
def test_index_service_policy(self):
rule_name = "os_compute_api:os-services"
with mock.patch('nova.compute.api.HostAPI.service_get_all'):
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller.index,
self.req)
def test_old_update_service_policy(self):
rule_name = "os_compute_api:os-services"
body = {'host': 'host1', 'binary': 'nova-compute'}
update = 'nova.compute.api.HostAPI.service_update_by_host_and_binary'
with mock.patch(update):
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller.update,
self.req, 'enable', body=body)
def test_update_service_policy(self):
rule_name = "os_compute_api:os-services"
req = fakes.HTTPRequest.blank(
'', version=services_v21.UUID_FOR_ID_MIN_VERSION)
service = self.start_service(
'compute', 'fake-compute-host').service_ref
with mock.patch('nova.compute.api.HostAPI.service_update'):
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller.update,
req, service.uuid,
body={'status': 'enabled'})
class ServicesScopeTypePolicyTest(ServicesPolicyTest):
"""Test os-services APIs policies with system scope enabled.
This class set the nova.conf [oslo_policy] enforce_scope to True
so that we can switch on the scope checking on oslo policy side.
It defines the set of context with scopped token
which are allowed and not allowed to pass the policy checks.
With those set of context, it will run the API operation and
verify the expected behaviour.
"""
def setUp(self):
super(ServicesScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")