Merge "Fix policy enforcement to properly detect admin"

This commit is contained in:
Jenkins
2015-01-10 02:43:05 +00:00
committed by Gerrit Code Review
2 changed files with 154 additions and 17 deletions

View File

@@ -67,28 +67,22 @@ class ContextHook(hooks.PecanHook):
def before(self, state):
headers = state.request.headers
user_id = headers.get('X-User-Id')
user_id = headers.get('X-User', user_id)
tenant = headers.get('X-Tenant-Id')
tenant = headers.get('X-Tenant', tenant)
domain_id = headers.get('X-User-Domain-Id')
domain_name = headers.get('X-User-Domain-Name')
auth_token = headers.get('X-Auth-Token')
roles = headers.get('X-Roles', '').split(',')
creds = {
'user': headers.get('X-User') or headers.get('X-User-Id'),
'tenant': headers.get('X-Tenant') or headers.get('X-Tenant-Id'),
'domain_id': headers.get('X-User-Domain-Id'),
'domain_name': headers.get('X-User-Domain-Name'),
'auth_token': headers.get('X-Auth-Token'),
'roles': headers.get('X-Roles', '').split(','),
}
is_public_api = state.request.environ.get('is_public_api', False)
creds = dict(headers)
is_admin = policy.enforce('admin_api', creds, creds)
is_public_api = state.request.environ.get('is_public_api', False)
state.request.context = context.RequestContext(
auth_token=auth_token,
user=user_id,
tenant=tenant,
domain_id=domain_id,
domain_name=domain_name,
is_public_api=is_public_api,
is_admin=is_admin,
roles=roles)
is_public_api=is_public_api,
**creds)
class RPCHook(hooks.PecanHook):

View File

@@ -19,11 +19,78 @@ import json
import mock
from oslo.config import cfg
from oslo import messaging
from webob import exc as webob_exc
from ironic.api.controllers import root
from ironic.api import hooks
from ironic.common import context
from ironic.tests.api import base
class FakeRequest(object):
def __init__(self, headers, context, environ):
self.headers = headers
self.context = context
self.environ = environ or {}
class FakeRequestState(object):
def __init__(self, headers=None, context=None, environ=None):
self.request = FakeRequest(headers, context, environ)
def set_context(self):
headers = self.request.headers
creds = {
'user': headers.get('X-User') or headers.get('X-User-Id'),
'tenant': headers.get('X-Tenant') or headers.get('X-Tenant-Id'),
'domain_id': headers.get('X-User-Domain-Id'),
'domain_name': headers.get('X-User-Domain-Name'),
'auth_token': headers.get('X-Auth-Token'),
'roles': headers.get('X-Roles', '').split(','),
}
is_admin = ('admin' in creds['roles'] or
'administrator' in creds['roles'])
is_public_api = self.request.environ.get('is_public_api', False)
self.request.context = context.RequestContext(
is_admin=is_admin, is_public_api=is_public_api, **creds)
def fake_headers(admin=False):
headers = {
'X-Auth-Token': '8d9f235ca7464dd7ba46f81515797ea0',
'X-Domain-Id': 'None',
'X-Domain-Name': 'None',
'X-Project-Domain-Id': 'default',
'X-Project-Domain-Name': 'Default',
'X-Project-Id': 'b4efa69d4ffa4973863f2eefc094f7f8',
'X-Project-Name': 'admin',
'X-Role': '_member_,admin',
'X-Roles': '_member_,admin',
'X-Tenant': 'foo',
'X-Tenant-Id': 'b4efa69d4ffa4973863f2eefc094f7f8',
'X-Tenant-Name': 'foo',
'X-User': 'foo',
'X-User-Domain-Id': 'default',
'X-User-Domain-Name': 'Default',
'X-User-Id': '604ab2a197c442c2a84aba66708a9e1e',
'X-User-Name': 'foo'
}
if admin:
headers.update({
'X-Project-Name': 'admin',
'X-Role': '_member_,admin',
'X-Roles': '_member_,admin',
})
else:
headers.update({
'X-Project-Name': 'foo',
'X-Role': '_member_',
'X-Roles': '_member_',
})
return headers
class TestNoExceptionTracebackHook(base.FunctionalTest):
TRACE = [u'Traceback (most recent call last):',
@@ -96,3 +163,79 @@ class TestNoExceptionTracebackHook(base.FunctionalTest):
actual_msg = json.loads(
response.json['error_message'])['faultstring']
self.assertEqual(self.MSG_WITH_TRACE, actual_msg)
class TestContextHook(base.FunctionalTest):
@mock.patch.object(context, 'RequestContext')
def test_context_hook_not_admin(self, mock_ctx):
headers = fake_headers(admin=False)
reqstate = FakeRequestState(headers=headers)
context_hook = hooks.ContextHook(None)
context_hook.before(reqstate)
mock_ctx.assert_called_with(
auth_token=headers['X-Auth-Token'],
user=headers['X-User'],
tenant=headers['X-Tenant'],
domain_id=headers['X-User-Domain-Id'],
domain_name=headers['X-User-Domain-Name'],
is_public_api=False,
is_admin=False,
roles=headers['X-Roles'].split(','))
@mock.patch.object(context, 'RequestContext')
def test_context_hook_admin(self, mock_ctx):
headers = fake_headers(admin=True)
reqstate = FakeRequestState(headers=headers)
context_hook = hooks.ContextHook(None)
context_hook.before(reqstate)
mock_ctx.assert_called_with(
auth_token=headers['X-Auth-Token'],
user=headers['X-User'],
tenant=headers['X-Tenant'],
domain_id=headers['X-User-Domain-Id'],
domain_name=headers['X-User-Domain-Name'],
is_public_api=False,
is_admin=True,
roles=headers['X-Roles'].split(','))
@mock.patch.object(context, 'RequestContext')
def test_context_hook_public_api(self, mock_ctx):
headers = fake_headers(admin=True)
env = {'is_public_api': True}
reqstate = FakeRequestState(headers=headers, environ=env)
context_hook = hooks.ContextHook(None)
context_hook.before(reqstate)
mock_ctx.assert_called_with(
auth_token=headers['X-Auth-Token'],
user=headers['X-User'],
tenant=headers['X-Tenant'],
domain_id=headers['X-User-Domain-Id'],
domain_name=headers['X-User-Domain-Name'],
is_public_api=True,
is_admin=True,
roles=headers['X-Roles'].split(','))
class TestTrustedCallHook(base.FunctionalTest):
def test_trusted_call_hook_not_admin(self):
headers = fake_headers(admin=False)
reqstate = FakeRequestState(headers=headers)
reqstate.set_context()
trusted_call_hook = hooks.TrustedCallHook()
self.assertRaises(webob_exc.HTTPForbidden,
trusted_call_hook.before, reqstate)
def test_trusted_call_hook_admin(self):
headers = fake_headers(admin=True)
reqstate = FakeRequestState(headers=headers)
reqstate.set_context()
trusted_call_hook = hooks.TrustedCallHook()
trusted_call_hook.before(reqstate)
def test_trusted_call_hook_public_api(self):
headers = fake_headers(admin=False)
env = {'is_public_api': True}
reqstate = FakeRequestState(headers=headers, environ=env)
reqstate.set_context()
trusted_call_hook = hooks.TrustedCallHook()
trusted_call_hook.before(reqstate)