Block delegation escalation of privilege

Forbids doing the following with either a trust
  or oauth based token:
  creating a trust
  approving a request_token
  listing request tokens

Change-Id: I1528f9dd003f5e03cbc50b78e1b32dbbf85ffcc2
Closes-Bug:  1324592
This commit is contained in:
Adam Young 2014-05-29 13:56:17 -04:00
parent 2b2c9fbd48
commit 73785122ee
5 changed files with 188 additions and 2 deletions

View File

@ -67,7 +67,7 @@ def is_v3_token(token):
def v3_token_to_auth_context(token): def v3_token_to_auth_context(token):
creds = {} creds = {'is_delegated_auth': False}
token_data = token['token'] token_data = token['token']
try: try:
creds['user_id'] = token_data['user']['id'] creds['user_id'] = token_data['user']['id']
@ -87,11 +87,31 @@ def v3_token_to_auth_context(token):
creds['group_ids'] = [ creds['group_ids'] = [
g['id'] for g in token_data['user'].get(federation.FEDERATION, {}).get( g['id'] for g in token_data['user'].get(federation.FEDERATION, {}).get(
'groups', [])] 'groups', [])]
trust = token_data.get('OS-TRUST:trust')
if trust is None:
creds['trust_id'] = None
creds['trustor_id'] = None
creds['trustee_id'] = None
else:
creds['trust_id'] = trust['id']
creds['trustor_id'] = trust['trustor_user']['id']
creds['trustee_id'] = trust['trustee_user']['id']
creds['is_delegated_auth'] = True
oauth1 = token_data.get('OS-OAUTH1')
if oauth1 is None:
creds['consumer_id'] = None
creds['access_token_id'] = None
else:
creds['consumer_id'] = oauth1['consumer_id']
creds['access_token_id'] = oauth1['access_token_id']
creds['is_delegated_auth'] = True
return creds return creds
def v2_token_to_auth_context(token): def v2_token_to_auth_context(token):
creds = {} creds = {'is_delegated_auth': False}
token_data = token['access'] token_data = token['access']
try: try:
creds['user_id'] = token_data['user']['id'] creds['user_id'] = token_data['user']['id']
@ -105,6 +125,18 @@ def v2_token_to_auth_context(token):
if 'roles' in token_data['user']: if 'roles' in token_data['user']:
creds['roles'] = [role['name'] for creds['roles'] = [role['name'] for
role in token_data['user']['roles']] role in token_data['user']['roles']]
trust = token_data.get('trust')
if trust is None:
creds['trust_id'] = None
creds['trustor_id'] = None
creds['trustee_id'] = None
else:
creds['trust_id'] = trust.get('id')
creds['trustor_id'] = trust.get('trustor_id')
creds['trustee_id'] = trust.get('trustee_id')
creds['is_delegated_auth'] = True
return creds return creds

View File

@ -95,6 +95,12 @@ class AccessTokenCrudV3(controller.V3Controller):
@controller.protected() @controller.protected()
def list_access_tokens(self, context, user_id): def list_access_tokens(self, context, user_id):
auth_context = context.get('environment',
{}).get('KEYSTONE_AUTH_CONTEXT', {})
if auth_context.get('is_delegated_auth'):
raise exception.Forbidden(
_('Cannot list request tokens'
' with a token issued via delegation.'))
refs = self.oauth_api.list_access_tokens(user_id) refs = self.oauth_api.list_access_tokens(user_id)
formatted_refs = ([self._format_token_entity(context, x) formatted_refs = ([self._format_token_entity(context, x)
for x in refs]) for x in refs])
@ -310,6 +316,12 @@ class OAuthControllerV3(controller.V3Controller):
there is not another easy way to make sure the user knows which roles there is not another easy way to make sure the user knows which roles
are being requested before authorizing. are being requested before authorizing.
""" """
auth_context = context.get('environment',
{}).get('KEYSTONE_AUTH_CONTEXT', {})
if auth_context.get('is_delegated_auth'):
raise exception.Forbidden(
_('Cannot authorize a request token'
' with a token issued via delegation.'))
req_token = self.oauth_api.get_request_token(request_token_id) req_token = self.oauth_api.get_request_token(request_token_id)

View File

@ -2777,6 +2777,42 @@ class TestTrustAuth(TestAuthInfo):
self.assertEqual(r.result['token']['project']['name'], self.assertEqual(r.result['token']['project']['name'],
self.project['name']) self.project['name'])
def test_impersonation_token_cannot_create_new_trust(self):
ref = self.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user_id,
project_id=self.project_id,
impersonation=True,
expires=dict(minutes=1),
role_ids=[self.role_id])
del ref['id']
r = self.post('/OS-TRUST/trusts', body={'trust': ref})
trust = self.assertValidTrustResponse(r)
auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
r = self.post('/auth/tokens', body=auth_data)
trust_token = r.headers['X-Subject-Token']
# Build second trust
ref = self.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user_id,
project_id=self.project_id,
impersonation=True,
expires=dict(minutes=1),
role_ids=[self.role_id])
del ref['id']
self.post('/OS-TRUST/trusts',
body={'trust': ref},
token=trust_token,
expected_status=403)
def assertTrustTokensRevoked(self, trust_id): def assertTrustTokensRevoked(self, trust_id):
revocation_response = self.get('/OS-REVOKE/events', revocation_response = self.get('/OS-REVOKE/events',
expected_status=200) expected_status=200)

View File

@ -13,6 +13,8 @@
# under the License. # under the License.
import copy import copy
import os
import tempfile
import uuid import uuid
from six.moves import urllib from six.moves import urllib
@ -26,6 +28,7 @@ from keystone.contrib.oauth1 import controllers
from keystone import exception from keystone import exception
from keystone.openstack.common.db.sqlalchemy import migration from keystone.openstack.common.db.sqlalchemy import migration
from keystone.openstack.common import importutils from keystone.openstack.common import importutils
from keystone.openstack.common import jsonutils
from keystone.tests import test_v3 from keystone.tests import test_v3
@ -486,6 +489,100 @@ class AuthTokenTests(OAuthFlowTests):
self.assertRaises(exception.TokenNotFound, self.token_api.get_token, self.assertRaises(exception.TokenNotFound, self.token_api.get_token,
self.keystone_token_id) self.keystone_token_id)
def _create_trust_get_token(self):
ref = self.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.user_id,
project_id=self.project_id,
impersonation=True,
expires=dict(minutes=1),
role_ids=[self.role_id])
del ref['id']
r = self.post('/OS-TRUST/trusts', body={'trust': ref})
trust = self.assertValidTrustResponse(r)
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
trust_id=trust['id'])
r = self.post('/auth/tokens', body=auth_data)
trust_token = r.headers['X-Subject-Token']
return trust_token
def _approve_request_token_url(self):
consumer = self._create_single_consumer()
consumer_id = consumer['id']
consumer_secret = consumer['secret']
self.consumer = {'key': consumer_id, 'secret': consumer_secret}
self.assertIsNotNone(self.consumer['secret'])
url, headers = self._create_request_token(self.consumer,
self.project_id)
content = self.post(url, headers=headers)
credentials = urllib.parse.parse_qs(content.result)
request_key = credentials['oauth_token'][0]
request_secret = credentials['oauth_token_secret'][0]
self.request_token = oauth1.Token(request_key, request_secret)
self.assertIsNotNone(self.request_token.key)
url = self._authorize_request_token(request_key)
return url
def test_oauth_token_cannot_create_new_trust(self):
self.test_oauth_flow()
ref = self.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.user_id,
project_id=self.project_id,
impersonation=True,
expires=dict(minutes=1),
role_ids=[self.role_id])
del ref['id']
self.post('/OS-TRUST/trusts',
body={'trust': ref},
token=self.keystone_token_id,
expected_status=403)
def test_oauth_token_cannot_authorize_request_token(self):
self.test_oauth_flow()
url = self._approve_request_token_url()
body = {'roles': [{'id': self.role_id}]}
self.put(url, body=body, token=self.keystone_token_id,
expected_status=403)
def test_oauth_token_cannot_list_request_tokens(self):
self._set_policy({"identity:list_access_tokens": [],
"identity:create_consumer": [],
"identity:authorize_request_token": []})
self.test_oauth_flow()
url = '/users/%s/OS-OAUTH1/access_tokens' % self.user_id
self.get(url, token=self.keystone_token_id,
expected_status=403)
def _set_policy(self, new_policy):
_unused, self.tmpfilename = tempfile.mkstemp()
self.config_fixture.config(policy_file=self.tmpfilename)
with open(self.tmpfilename, "w") as policyfile:
policyfile.write(jsonutils.dumps(new_policy))
self.addCleanup(os.remove, self.tmpfilename)
def test_trust_token_cannot_authorize_request_token(self):
trust_token = self._create_trust_get_token()
url = self._approve_request_token_url()
body = {'roles': [{'id': self.role_id}]}
self.put(url, body=body, token=trust_token, expected_status=403)
def test_trust_token_cannot_list_request_tokens(self):
self._set_policy({"identity:list_access_tokens": [],
"identity:create_trust": []})
trust_token = self._create_trust_get_token()
url = '/users/%s/OS-OAUTH1/access_tokens' % self.user_id
self.get(url, token=trust_token, expected_status=403)
class MaliciousOAuth1Tests(OAuth1Tests): class MaliciousOAuth1Tests(OAuth1Tests):

View File

@ -132,6 +132,15 @@ class TrustV3(controller.V3Controller):
# TODO(ayoung): instead of raising ValidationError on the first # TODO(ayoung): instead of raising ValidationError on the first
# problem, return a collection of all the problems. # problem, return a collection of all the problems.
# Explicitly prevent a trust token from creating a new trust.
auth_context = context.get('environment',
{}).get('KEYSTONE_AUTH_CONTEXT', {})
if auth_context.get('is_delegated_auth'):
raise exception.Forbidden(
_('Cannot create a trust'
' with a token issued via delegation.'))
if not trust: if not trust:
raise exception.ValidationError(attribute='trust', raise exception.ValidationError(attribute='trust',
target='request') target='request')