Derive context is_admin from policy

Use a rule in the policy.json to correctly derive a user's admin-ness
based on their roles, when it is not explicitly specified in the
RequestContext constructor.

Keystone auth_token tests X-Roles based on the user role membership
for the tenant the token is scoped to, so this will allow us to define
per-tenant admins (the admin everywhere issue described in keystone
bug #968696 won't apply here, that is specific to keystone).

Remove the admin_role config option, which doensn't do anything, and
now this should be specified via policy.json.

Change-Id: I1921a2b515ecc8ca5d37205e3cdb6b7b8695b1ac
bluprint: request-scoping-policy
This commit is contained in:
Steven Hardy 2013-11-29 14:50:07 +00:00
parent 1aa3ce7460
commit 3a1e7838ff
5 changed files with 40 additions and 8 deletions

View File

@ -1,4 +1,6 @@
{
"context_is_admin": "role:admin",
"deny_stack_user": "not role:heat_stack_user",
"cloudformation:ListStacks": "rule:deny_stack_user",
"cloudformation:CreateStack": "rule:deny_stack_user",

View File

@ -17,6 +17,7 @@ from oslo.config import cfg
from heat.openstack.common import local
from heat.common import exception
from heat.common import policy
from heat.common import wsgi
from heat.openstack.common import context
from heat.openstack.common import importutils
@ -36,7 +37,7 @@ class RequestContext(context.RequestContext):
def __init__(self, auth_token=None, username=None, password=None,
aws_creds=None, tenant=None,
tenant_id=None, auth_url=None, roles=None, is_admin=False,
tenant_id=None, auth_url=None, roles=None, is_admin=None,
read_only=False, show_deleted=False,
owner_is_tenant=True, overwrite=True,
trust_id=None, trustor_user_id=None,
@ -67,6 +68,12 @@ class RequestContext(context.RequestContext):
self._session = None
self.trust_id = trust_id
self.trustor_user_id = trustor_user_id
self.policy = policy.Enforcer()
if is_admin is None:
self.is_admin = self.policy.check_is_admin(self)
else:
self.is_admin = is_admin
def update_store(self):
local.store.context = self
@ -108,8 +115,7 @@ def get_admin_context(read_deleted="no"):
class ContextMiddleware(wsgi.Middleware):
opts = [cfg.BoolOpt('owner_is_tenant', default=True),
cfg.StrOpt('admin_role', default='admin')]
opts = [cfg.BoolOpt('owner_is_tenant', default=True)]
def __init__(self, app, conf, **local_conf):
cfg.CONF.register_opts(self.opts)
@ -144,7 +150,7 @@ class ContextMiddleware(wsgi.Middleware):
3. X-Auth-Token is omitted. If we were using Keystone, then the
tokenauth middleware would have rejected the request, so we must be
using NoAuth. In that case, assume that is_admin=True.
using NoAuth.
"""
headers = req.headers
@ -186,8 +192,7 @@ class ContextMiddleware(wsgi.Middleware):
aws_creds=aws_creds,
username=username,
password=password,
auth_url=auth_url, roles=roles,
is_admin=True)
auth_url=auth_url, roles=roles)
def ContextMiddleware_filter_factory(global_conf, **local_conf):

View File

@ -143,6 +143,7 @@ class FakeKeystoneClient(object):
def create_trust_context(self):
return context.RequestContext(username=self.username,
password=self.password,
is_admin=False,
trust_id='atrust',
trustor_user_id='auser123')

View File

@ -12,9 +12,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
import os
from heat.common import context
from heat.tests.common import HeatTestCase
policy_path = os.path.dirname(os.path.realpath(__file__)) + "/policy/"
class TestRequestContext(HeatTestCase):
@ -22,11 +27,11 @@ class TestRequestContext(HeatTestCase):
self.ctx = {'username': 'mick',
'trustor_user_id': None,
'auth_token': '123',
'is_admin': True,
'is_admin': False,
'user': 'mick',
'password': 'foo',
'trust_id': None,
'roles': ['arole', 'admin'],
'roles': ['arole', 'notadmin'],
'tenant_id': '456tenant',
'tenant': 'atenant',
'auth_url': 'http://xyz',
@ -62,3 +67,21 @@ class TestRequestContext(HeatTestCase):
override = '%s_override' % k
setattr(ctx, k, override)
self.assertEqual(ctx.to_dict().get(k), override)
def test_get_admin_context(self):
ctx = context.get_admin_context()
self.assertTrue(ctx.is_admin)
def test_admin_context_policy_true(self):
policy_check = 'heat.common.policy.Enforcer.check_is_admin'
with mock.patch(policy_check) as pc:
pc.return_value = True
ctx = context.RequestContext(roles=['admin'])
self.assertTrue(ctx.is_admin)
def test_admin_context_policy_false(self):
policy_check = 'heat.common.policy.Enforcer.check_is_admin'
with mock.patch(policy_check) as pc:
pc.return_value = False
ctx = context.RequestContext(roles=['notadmin'])
self.assertFalse(ctx.is_admin)

View File

@ -143,6 +143,7 @@ def dummy_context(user='test_username', tenant_id='test_tenant_id',
'username': user,
'password': password,
'roles': roles,
'is_admin': False,
'auth_url': 'http://server.test:5000/v2.0',
'auth_token': 'abcd1234'
})