Separate CRUD policy for server_groups

The same policy rule (os_compute_api:os-server-groups) is being used
for all actions (show, index, delete, create) for server_groups REST
APIs. It is thus impossible to provide different RBAC for specific
actions based on roles. To address this changes are made to have
separate policy rules for each of action.

It has been argued that index and show may not need separate policy
rules, but most other places in nova (and OpenStack in general) do
have separate policy rules for each action. This affords the ultimate
flexibility to deployers, who can obviously use the same rule if
that is what they want. One example where show and index may be
different is that if show is restricted based on some criteria, such
that a user is able to see some resources within the tenant but not
others, then list would need to be disallowed to prevent the user
from using list to see resources they cannot show.

Change-Id: Ica9e07f6e80257902b4a0cc44b65fd6bad008bba
Closes-Bug: #1636157
This commit is contained in:
Prashanth kumar reddy 2016-10-27 07:09:01 -04:00 committed by Matthew Edmonds
parent a58c7e5173
commit 4a09c2210b
7 changed files with 172 additions and 16 deletions

View File

@ -36,9 +36,9 @@ LOG = logging.getLogger(__name__)
ALIAS = "os-server-groups"
def _authorize_context(req):
def _authorize_context(req, action):
context = req.environ['nova.context']
context.can(sg_policies.BASE_POLICY_NAME)
context.can(sg_policies.POLICY_ROOT % action)
return context
@ -75,7 +75,7 @@ class ServerGroupController(wsgi.Controller):
@extensions.expected_errors(404)
def show(self, req, id):
"""Return data about the given server group."""
context = _authorize_context(req)
context = _authorize_context(req, 'show')
try:
sg = objects.InstanceGroup.get_by_uuid(context, id)
except nova.exception.InstanceGroupNotFound as e:
@ -86,7 +86,7 @@ class ServerGroupController(wsgi.Controller):
@extensions.expected_errors(404)
def delete(self, req, id):
"""Delete an server group."""
context = _authorize_context(req)
context = _authorize_context(req, 'delete')
try:
sg = objects.InstanceGroup.get_by_uuid(context, id)
except nova.exception.InstanceGroupNotFound as e:
@ -117,7 +117,7 @@ class ServerGroupController(wsgi.Controller):
@extensions.expected_errors(())
def index(self, req):
"""Returns a list of server groups."""
context = _authorize_context(req)
context = _authorize_context(req, 'index')
project_id = context.project_id
if 'all_projects' in req.GET and context.is_admin:
sgs = objects.InstanceGroupList.get_all(context)
@ -135,7 +135,7 @@ class ServerGroupController(wsgi.Controller):
@validation.schema(schema.create_v215, "2.15")
def create(self, req, body):
"""Creates a new server group."""
context = _authorize_context(req)
context = _authorize_context(req, 'create')
quotas = objects.Quotas(context=context)
try:

View File

@ -20,6 +20,7 @@ from nova.policies import base
BASE_POLICY_NAME = 'os_compute_api:os-server-groups'
POLICY_ROOT = 'os_compute_api:os-server-groups:%s'
BASE_POLICY_RULE = 'rule:%s' % BASE_POLICY_NAME
server_groups_policies = [
@ -29,6 +30,18 @@ server_groups_policies = [
policy.RuleDefault(
name=BASE_POLICY_NAME,
check_str=base.RULE_ADMIN_OR_OWNER),
policy.RuleDefault(
name=POLICY_ROOT % 'create',
check_str=BASE_POLICY_RULE),
policy.RuleDefault(
name=POLICY_ROOT % 'delete',
check_str=BASE_POLICY_RULE),
policy.RuleDefault(
name=POLICY_ROOT % 'index',
check_str=BASE_POLICY_RULE),
policy.RuleDefault(
name=POLICY_ROOT % 'show',
check_str=BASE_POLICY_RULE),
]

View File

@ -22,8 +22,10 @@ from nova import context
import nova.db
from nova import exception
from nova import objects
from nova.policies import server_groups as sg_policies
from nova import test
from nova.tests.unit.api.openstack import fakes
from nova.tests.unit import policy_fixture
from nova.tests import uuidsentinel
@ -80,6 +82,9 @@ class ServerGroupTestV21(test.TestCase):
super(ServerGroupTestV21, self).setUp()
self._setup_controller()
self.req = fakes.HTTPRequest.blank('')
self.admin_req = fakes.HTTPRequest.blank('', use_admin_context=True)
self.foo_req = fakes.HTTPRequest.blank('', project_id='foo')
self.policy = self.useFixture(policy_fixture.RealPolicyFixture())
def _setup_controller(self):
self.controller = sg_v21.ServerGroupController()
@ -103,6 +108,36 @@ class ServerGroupTestV21(test.TestCase):
for policy in policies:
self._create_server_group_normal([policy])
def test_create_server_group_rbac_default(self):
sgroup = server_group_template()
sgroup['policies'] = ['affinity']
# test as admin
self.controller.create(self.admin_req, body={'server_group': sgroup})
# test as non-admin
self.controller.create(self.req, body={'server_group': sgroup})
def test_create_server_group_rbac_admin_only(self):
sgroup = server_group_template()
sgroup['policies'] = ['affinity']
# override policy to restrict to admin
rule_name = sg_policies.POLICY_ROOT % 'create'
rules = {rule_name: 'is_admin:True'}
self.policy.set_rules(rules, overwrite=False)
# check for success as admin
self.controller.create(self.admin_req, body={'server_group': sgroup})
# check for failure as non-admin
exc = self.assertRaises(exception.PolicyNotAuthorized,
self.controller.create, self.req,
body={'server_group': sgroup})
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
def _create_instance(self, context):
instance = objects.Instance(context=context,
image_ref=uuidsentinel.fake_image_ref,
@ -182,12 +217,15 @@ class ServerGroupTestV21(test.TestCase):
path = '/os-server-groups?all_projects=True'
req = fakes.HTTPRequest.blank(path, use_admin_context=True,
version=api_version)
res_dict = self.controller.index(req)
req = fakes.HTTPRequest.blank(path, version=api_version)
admin_req = fakes.HTTPRequest.blank(path, use_admin_context=True,
version=api_version)
# test as admin
res_dict = self.controller.index(admin_req)
self.assertEqual(all, res_dict)
req = fakes.HTTPRequest.blank(path,
version=api_version)
# test as non-admin
res_dict = self.controller.index(req)
self.assertEqual(tenant_specific, res_dict)
@ -235,9 +273,8 @@ class ServerGroupTestV21(test.TestCase):
return_get_by_project = return_server_groups()
mock_get_by_project.return_value = return_get_by_project
path = '/os-server-groups'
self.req = fakes.HTTPRequest.blank(path,
version=api_version)
res_dict = self.controller.index(self.req)
req = fakes.HTTPRequest.blank(path, version=api_version)
res_dict = self.controller.index(req)
self.assertEqual(expected, res_dict)
def test_display_members(self):
@ -269,6 +306,39 @@ class ServerGroupTestV21(test.TestCase):
self.assertEqual(1, len(result_members))
self.assertIn(instances[0].uuid, result_members)
def test_display_members_rbac_default(self):
ctx = context.RequestContext('fake_user', 'fake')
ig_uuid = self._create_groups_and_instances(ctx)[0]
# test as admin
self.controller.show(self.admin_req, ig_uuid)
# test as non-admin, same project
self.controller.show(self.req, ig_uuid)
# test as non-admin, different project
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.show, self.foo_req, ig_uuid)
def test_display_members_rbac_admin_only(self):
ctx = context.RequestContext('fake_user', 'fake')
ig_uuid = self._create_groups_and_instances(ctx)[0]
# override policy to restrict to admin
rule_name = sg_policies.POLICY_ROOT % 'show'
rules = {rule_name: 'is_admin:True'}
self.policy.set_rules(rules, overwrite=False)
# check for success as admin
self.controller.show(self.admin_req, ig_uuid)
# check for failure as non-admin
exc = self.assertRaises(exception.PolicyNotAuthorized,
self.controller.show, self.req, ig_uuid)
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
def test_create_server_group_with_non_alphanumeric_in_name(self):
# The fix for bug #1434335 expanded the allowable character set
# for server group names to include non-alphanumeric characters
@ -384,6 +454,29 @@ class ServerGroupTestV21(test.TestCase):
def test_list_server_group_all(self):
self._test_list_server_group_all(api_version='2.1')
def test_list_server_groups_rbac_default(self):
# test as admin
self.controller.index(self.admin_req)
# test as non-admin
self.controller.index(self.req)
def test_list_server_groups_rbac_admin_only(self):
# override policy to restrict to admin
rule_name = sg_policies.POLICY_ROOT % 'index'
rules = {rule_name: 'is_admin:True'}
self.policy.set_rules(rules, overwrite=False)
# check for success as admin
self.controller.index(self.admin_req)
# check for failure as non-admin
exc = self.assertRaises(exception.PolicyNotAuthorized,
self.controller.index, self.req)
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
def test_delete_server_group_by_id(self):
sg = server_group_template(id=uuidsentinel.sg1_id)
@ -416,6 +509,37 @@ class ServerGroupTestV21(test.TestCase):
self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete,
self.req, 'invalid')
def test_delete_server_group_rbac_default(self):
ctx = context.RequestContext('fake_user', 'fake')
# test as admin
ig_uuid = self._create_groups_and_instances(ctx)[0]
self.controller.delete(self.admin_req, ig_uuid)
# test as non-admin
ig_uuid = self._create_groups_and_instances(ctx)[0]
self.controller.delete(self.req, ig_uuid)
def test_delete_server_group_rbac_admin_only(self):
ctx = context.RequestContext('fake_user', 'fake')
# override policy to restrict to admin
rule_name = sg_policies.POLICY_ROOT % 'delete'
rules = {rule_name: 'is_admin:True'}
self.policy.set_rules(rules, overwrite=False)
# check for success as admin
ig_uuid = self._create_groups_and_instances(ctx)[0]
self.controller.delete(self.admin_req, ig_uuid)
# check for failure as non-admin
ig_uuid = self._create_groups_and_instances(ctx)[0]
exc = self.assertRaises(exception.PolicyNotAuthorized,
self.controller.delete, self.req, ig_uuid)
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
class ServerGroupTestV213(ServerGroupTestV21):
wsgi_api_version = '2.13'

View File

@ -102,6 +102,10 @@ policy_data = """
"os_compute_api:os-server-tags:delete_all": "",
"os_compute_api:os-server-usage": "",
"os_compute_api:os-server-groups": "",
"os_compute_api:os-server-groups:show": "",
"os_compute_api:os-server-groups:index": "",
"os_compute_api:os-server-groups:create": "",
"os_compute_api:os-server-groups:delete": "",
"os_compute_api:os-services": "",
"os_compute_api:os-shelve:shelve": "",
"os_compute_api:os-shelve:shelve_offload": "",

View File

@ -54,9 +54,10 @@ class RealPolicyFixture(fixtures.Fixture):
nova.policy.init()
self.addCleanup(nova.policy.reset)
def set_rules(self, rules):
def set_rules(self, rules, overwrite=True):
policy = nova.policy._ENFORCER
policy.set_rules(oslo_policy.Rules.from_dict(rules))
policy.set_rules(oslo_policy.Rules.from_dict(rules),
overwrite=overwrite)
def add_missing_default_rules(self, rules):
"""Adds default rules and their values to the given rules dict.

View File

@ -402,6 +402,10 @@ class RealRolePolicyTestCase(test.NoDBTestCase):
"os_compute_api:os-server-password",
"os_compute_api:os-server-usage",
"os_compute_api:os-server-groups",
"os_compute_api:os-server-groups:index",
"os_compute_api:os-server-groups:show",
"os_compute_api:os-server-groups:create",
"os_compute_api:os-server-groups:delete",
"os_compute_api:os-shelve:shelve",
"os_compute_api:os-shelve:unshelve",
"os_compute_api:os-virtual-interfaces",

View File

@ -0,0 +1,10 @@
---
features:
- |
The same policy rule (os_compute_api:os-server-groups)
was being used for all actions (show, index, delete, create)
for server_groups REST APIs. It was thus impossible to provide
different RBAC for specific actions based on roles. To address
this changes are made to have separate policy rules for each
action. The original rule (os_compute_api:os-server-groups) is
left unchanged for backward compatibility.