Add test coverage of existing os-services policies

Current tests do not have good test coverage of existing policies.
Either tests for policies do not exist or if they exist then they
do not cover the actual negative and positive testing.

For Example, if any policy with default rule as admin only then
test should verify:
- policy check pass with context having admin role
- policy check fail with context having any other role than admin

As discussed in policy-defaults-refresh [1], to change the policies
with new default roles and scope_type, we need to have the enough
testing coverage of existing policy behavior.
When we will add the scope_type in policies or new default roles,
then these test coverage will be extended to adopt the new changes
and also make sure we do not break the existing behavior.

This commit covers the testing coverage of existing os-services policies.

Partial implement blueprint policy-defaults-refresh

[1] https://specs.openstack.org/openstack/nova-specs/specs/train/approved/policy-default-refresh.html#testing

Change-Id: I3652506650156abb2f5a4e012064081a0fa95f43
This commit is contained in:
Ghanshyam Mann
2019-08-15 08:44:13 +00:00
parent e2bba0b15e
commit 68f195928e
4 changed files with 212 additions and 40 deletions

View File

@@ -1369,43 +1369,3 @@ class ServicesTestV275(test.TestCase):
version=self.wsgi_api_version)
self.assertRaises(exception.ValidationError,
self.controller.index, req)
class ServicesPolicyEnforcementV21(test.NoDBTestCase):
def setUp(self):
super(ServicesPolicyEnforcementV21, self).setUp()
self.controller = services_v21.ServiceController()
self.req = fakes.HTTPRequest.blank('')
def test_update_policy_failed(self):
rule_name = "os_compute_api:os-services"
self.policy.set_rules({rule_name: "project_id:non_fake"})
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller.update, self.req, fakes.FAKE_UUID,
body={'host': 'host1',
'binary': 'nova-compute'})
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
def test_delete_policy_failed(self):
rule_name = "os_compute_api:os-services"
self.policy.set_rules({rule_name: "project_id:non_fake"})
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller.delete, self.req, fakes.FAKE_UUID)
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
def test_index_policy_failed(self):
rule_name = "os_compute_api:os-services"
self.policy.set_rules({rule_name: "project_id:non_fake"})
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller.index, self.req)
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())

View File

View File

@@ -0,0 +1,113 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from oslo_utils.fixture import uuidsentinel as uuids
from nova import context as nova_context
from nova import exception
from nova import test
from nova.tests.unit import policy_fixture
LOG = logging.getLogger(__name__)
class BasePolicyTest(test.TestCase):
def setUp(self):
super(BasePolicyTest, self).setUp()
self.policy = self.useFixture(policy_fixture.RealPolicyFixture())
self.admin_project_id = uuids.admin_project_id
self.project_id = uuids.project_id
self.project_id_other = uuids.project_id_other
# all context are with implied roles.
self.legacy_admin_context = nova_context.RequestContext(
user_id="legacy_admin", project_id=self.admin_project_id,
roles=['admin', 'member', 'reader'])
# system scoped users
self.system_admin_context = nova_context.RequestContext(
user_id="admin",
roles=['admin', 'member', 'reader'], system_scope='all')
self.system_member_context = nova_context.RequestContext(
user_id="member",
roles=['member', 'reader'], system_scope='all')
self.system_reader_context = nova_context.RequestContext(
user_id="reader", roles=['reader'], system_scope='all')
self.system_foo_context = nova_context.RequestContext(
user_id="foo", roles=['foo'], system_scope='all')
# project scoped users
self.project_admin_context = nova_context.RequestContext(
user_id="project_admin", project_id=self.project_id,
roles=['admin', 'member', 'reader'])
self.project_member_context = nova_context.RequestContext(
user_id="project_member", project_id=self.project_id,
roles=['member', 'reader'])
self.project_reader_context = nova_context.RequestContext(
user_id="project_reader", project_id=self.project_id,
roles=['reader'])
self.project_foo_context = nova_context.RequestContext(
user_id="project_foo", project_id=self.project_id,
roles=['foo'])
self.other_project_member_context = nova_context.RequestContext(
user_id="other_project_member",
project_id=self.project_id_other,
roles=['member', 'reader'])
self.all_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.other_project_member_context,
self.project_foo_context,
]
def common_policy_check(self, authorized_contexts,
unauthorized_contexts, rule_name,
func, req, *arg, **kwarg):
self.assertEqual(len(self.all_contexts),
len(authorized_contexts) + len(
unauthorized_contexts),
"Few context are missing. check all contexts "
"mentioned in self.all_contexts are tested")
def ensure_raises(req):
exc = self.assertRaises(
exception.PolicyNotAuthorized, func, req, *arg, **kwarg)
self.assertEqual(
"Policy doesn't allow %s to be performed." %
rule_name, exc.format_message())
# Verify all the context having allowed scope and roles pass
# the policy check.
for context in authorized_contexts:
LOG.info("Testing authorized context: %s", context)
req.environ['nova.context'] = context
func(req, *arg, **kwarg)
# Verify all the context not having allowed scope or roles fail
# the policy check.
for context in unauthorized_contexts:
LOG.info("Testing unauthorized context: %s", context)
req.environ['nova.context'] = context
ensure_raises(req)

View File

@@ -0,0 +1,99 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from nova.api.openstack.compute import services as services_v21
from nova.tests.unit.api.openstack import fakes
from nova.tests.unit.policies import base
class ServicesPolicyTest(base.BasePolicyTest):
"""Test os-services APIs policies with all possible context.
This class defines the set of context with different roles
which are allowed and not allowed to pass the policy checks.
With those set of context, it will call the API operation and
verify the expected behaviour.
"""
def setUp(self):
super(ServicesPolicyTest, self).setUp()
self.controller = services_v21.ServiceController()
self.req = fakes.HTTPRequest.blank('/services')
# Check that admin is able to change the service
self.admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context]
# Check that non-admin is not able to change the service
self.admin_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.other_project_member_context,
self.project_foo_context, self.project_reader_context
]
def test_delete_service_policy(self):
rule_name = "os_compute_api:os-services"
with mock.patch('nova.compute.api.HostAPI.service_get_by_id'):
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller.delete,
self.req, 1)
def test_index_service_policy(self):
rule_name = "os_compute_api:os-services"
with mock.patch('nova.compute.api.HostAPI.service_get_all'):
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller.index,
self.req)
def test_old_update_service_policy(self):
rule_name = "os_compute_api:os-services"
body = {'host': 'host1', 'binary': 'nova-compute'}
update = 'nova.compute.api.HostAPI.service_update_by_host_and_binary'
with mock.patch(update):
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller.update,
self.req, 'enable', body=body)
def test_update_service_policy(self):
rule_name = "os_compute_api:os-services"
req = fakes.HTTPRequest.blank(
'', version=services_v21.UUID_FOR_ID_MIN_VERSION)
service = self.start_service(
'compute', 'fake-compute-host').service_ref
with mock.patch('nova.compute.api.HostAPI.service_update'):
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller.update,
req, service.uuid,
body={'status': 'enabled'})
class ServicesScopeTypePolicyTest(ServicesPolicyTest):
"""Test os-services APIs policies with system scope enabled.
This class set the nova.conf [oslo_policy] enforce_scope to True
so that we can switch on the scope checking on oslo policy side.
It defines the set of context with scopped token
which are allowed and not allowed to pass the policy checks.
With those set of context, it will run the API operation and
verify the expected behaviour.
"""
def setUp(self):
super(ServicesScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")