Prohibit V3 V2 token intermix for resource in non-default domain (bug 1157430)

Change-Id: Ibe9019684b45651a9679311a3bacdad41b4116f5
This commit is contained in:
Guang Yee 2013-03-19 19:14:47 -07:00
parent 5cb8e1f2e5
commit 550973b64a
3 changed files with 277 additions and 10 deletions

View File

@ -473,6 +473,46 @@ class Auth(controller.V2Controller):
_('Token does not belong to specified tenant.'))
return data
def _assert_default_domain(self, context, token_ref):
""" Make sure we are operating on default domain only. """
if token_ref.get('token_data'):
# this is a V3 token
msg = _('Non-default domain is not supported')
# user in a non-default is prohibited
if (token_ref['token_data']['token']['user']['domain']['id'] !=
DEFAULT_DOMAIN_ID):
raise exception.Unauthorized(msg)
# domain scoping is prohibited
if token_ref['token_data']['token'].get('domain'):
raise exception.Unauthorized(
_('Domain scoped token is not supported'))
# project in non-default domain is prohibited
if token_ref['token_data']['token'].get('project'):
project = token_ref['token_data']['token']['project']
project_domain_id = project['domain']['id']
# scoped to project in non-default domain is prohibited
if project_domain_id != DEFAULT_DOMAIN_ID:
raise exception.Unauthorized(msg)
# if token is scoped to trust, both trustor and trustee must
# be in the default domain. Furthermore, the delegated project
# must also be in the default domain
metadata_ref = token_ref['metadata']
if 'trust_id' in metadata_ref:
trust_ref = self.trust_api.get_trust(context,
metadata_ref['trust_id'])
trustee_user_ref = self.identity_api.get_user(
context, trust_ref['trustee_user_id'])
if trustee_user_ref['domain_id'] != DEFAULT_DOMAIN_ID:
raise exception.Unauthorized(msg)
trustor_user_ref = self.identity_api.get_user(
context, trust_ref['trustor_user_id'])
if trustor_user_ref['domain_id'] != DEFAULT_DOMAIN_ID:
raise exception.Unauthorized(msg)
project_ref = self.identity_api.get_project(
context, trust_ref['project_id'])
if project_ref['domain_id'] != DEFAULT_DOMAIN_ID:
raise exception.Unauthorized(msg)
# admin only
def validate_token_head(self, context, token_id):
"""Check that a token is valid.
@ -483,7 +523,9 @@ class Auth(controller.V2Controller):
"""
belongs_to = context['query_string'].get('belongsTo')
assert self._get_token_ref(context, token_id, belongs_to)
token_ref = self._get_token_ref(context, token_id, belongs_to)
assert token_ref
self._assert_default_domain(context, token_ref)
# admin only
def validate_token(self, context, token_id):
@ -496,6 +538,7 @@ class Auth(controller.V2Controller):
"""
belongs_to = context['query_string'].get('belongsTo')
token_ref = self._get_token_ref(context, token_id, belongs_to)
self._assert_default_domain(context, token_ref)
# TODO(termie): optimize this call at some point and put it into the
# the return for metadata

View File

@ -15,6 +15,7 @@ import test_content_types
CONF = config.CONF
DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id
TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
@ -56,6 +57,21 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
self.user['id'] = self.user_id
self.identity_api.create_user(self.user_id, self.user)
self.default_domain_project_id = uuid.uuid4().hex
self.default_domain_project = self.new_project_ref(
domain_id=DEFAULT_DOMAIN_ID)
self.default_domain_project['id'] = self.default_domain_project_id
self.identity_api.create_project(self.default_domain_project_id,
self.default_domain_project)
self.default_domain_user_id = uuid.uuid4().hex
self.default_domain_user = self.new_user_ref(
domain_id=DEFAULT_DOMAIN_ID,
project_id=self.default_domain_project_id)
self.default_domain_user['id'] = self.default_domain_user_id
self.identity_api.create_user(self.default_domain_user_id,
self.default_domain_user)
# create & grant policy.json's default role for admin_required
self.role_id = uuid.uuid4().hex
self.role = self.new_role_ref()
@ -64,6 +80,12 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
self.identity_api.create_role(self.role_id, self.role)
self.identity_api.add_role_to_user_and_project(
self.user_id, self.project_id, self.role_id)
self.identity_api.add_role_to_user_and_project(
self.default_domain_user_id, self.default_domain_project_id,
self.role_id)
self.identity_api.add_role_to_user_and_project(
self.default_domain_user_id, self.project_id,
self.role_id)
self.public_server = self.serveapp('keystone', name='main')
self.admin_server = self.serveapp('keystone', name='admin')

View File

@ -113,7 +113,7 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
CONF.signing.keyfile)
self.assertEqual(token_signed, token_id)
def test_v3_v2_unscoped_uuid_token_intermix(self):
def test_v3_v2_intermix_non_default_domain_failed(self):
self.opt_in_group('signing', token_format='UUID')
auth_data = self.build_authentication_request(
user_id=self.user['id'],
@ -122,6 +122,59 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
token_data = resp.body
token = resp.getheader('X-Subject-Token')
# now validate the v3 token with v2 API
path = '/v2.0/tokens/%s' % (token)
resp = self.admin_request(path=path,
token='ADMIN',
method='GET',
expected_status=401)
def test_v3_v2_intermix_domain_scoped_token_failed(self):
self.opt_in_group('signing', token_format='UUID')
# grant the domain role to user
path = '/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id'])
resp = self.post('/auth/tokens', body=auth_data)
token_data = resp.body
token = resp.getheader('X-Subject-Token')
# now validate the v3 token with v2 API
path = '/v2.0/tokens/%s' % (token)
resp = self.admin_request(path=path,
token='ADMIN',
method='GET',
expected_status=401)
def test_v3_v2_intermix_non_default_project_failed(self):
auth_data = self.build_authentication_request(
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'],
project_id=self.project['id'])
resp = self.post('/auth/tokens', body=auth_data)
token_data = resp.body
token = resp.getheader('X-Subject-Token')
# now validate the v3 token with v2 API
path = '/v2.0/tokens/%s' % (token)
resp = self.admin_request(path=path,
token='ADMIN',
method='GET',
expected_status=401)
def test_v3_v2_unscoped_uuid_token_intermix(self):
self.opt_in_group('signing', token_format='UUID')
auth_data = self.build_authentication_request(
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'])
resp = self.post('/auth/tokens', body=auth_data)
token_data = resp.body
token = resp.getheader('X-Subject-Token')
# now validate the v3 token with v2 API
path = '/v2.0/tokens/%s' % (token)
resp = self.admin_request(path=path,
@ -138,8 +191,8 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
def test_v3_v2_unscoped_pki_token_intermix(self):
self.opt_in_group('signing', token_format='PKI')
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'])
resp = self.post('/auth/tokens', body=auth_data)
token_data = resp.body
token = resp.getheader('X-Subject-Token')
@ -162,9 +215,9 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
# data is baked into the token itself.
self.opt_in_group('signing', token_format='UUID')
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'],
project_id=self.default_domain_project['id'])
resp = self.post('/auth/tokens', body=auth_data)
token_data = resp.body
token = resp.getheader('X-Subject-Token')
@ -189,9 +242,9 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
# data is baked into the token itself.
self.opt_in_group('signing', token_format='PKI')
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'],
project_id=self.default_domain_project['id'])
resp = self.post('/auth/tokens', body=auth_data)
token_data = resp.body
token = resp.getheader('X-Subject-Token')
@ -1091,6 +1144,155 @@ class TestTrustAuth(TestAuthInfo):
trust_id=trust['id'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_v3_v2_intermix_trustor_not_in_default_domain_failed(self):
ref = self.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.default_domain_user_id,
project_id=self.project_id,
impersonation=False,
expires=dict(minutes=1),
role_ids=[self.role_id])
del ref['id']
r = self.post('/trusts', body={'trust': ref})
trust = self.assertValidTrustResponse(r)
auth_data = self.build_authentication_request(
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'],
trust_id=trust['id'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectTrustScopedTokenResponse(
r, self.default_domain_user)
token = r.getheader('X-Subject-Token')
# now validate the v3 token with v2 API
path = '/v2.0/tokens/%s' % (token)
resp = self.admin_request(path=path,
token='ADMIN',
method='GET',
expected_status=401)
def test_v3_v2_intermix_trustor_not_in_default_domaini_failed(self):
ref = self.new_trust_ref(
trustor_user_id=self.default_domain_user_id,
trustee_user_id=self.trustee_user_id,
project_id=self.default_domain_project_id,
impersonation=False,
expires=dict(minutes=1),
role_ids=[self.role_id])
del ref['id']
auth_data = self.build_authentication_request(
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'],
project_id=self.default_domain_project_id)
r = self.post('/auth/tokens', body=auth_data)
token = r.getheader('X-Subject-Token')
r = self.post('/trusts', body={'trust': ref}, token=token)
trust = self.assertValidTrustResponse(r)
auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectTrustScopedTokenResponse(
r, self.trustee_user)
token = r.getheader('X-Subject-Token')
# now validate the v3 token with v2 API
path = '/v2.0/tokens/%s' % (token)
resp = self.admin_request(path=path,
token='ADMIN',
method='GET',
expected_status=401)
def test_v3_v2_intermix_project_not_in_default_domaini_failed(self):
# create a trustee in default domain to delegate stuff to
trustee_user_id = uuid.uuid4().hex
trustee_user = self.new_user_ref(domain_id=test_v3.DEFAULT_DOMAIN_ID)
trustee_user['id'] = trustee_user_id
self.identity_api.create_user(trustee_user_id, trustee_user)
ref = self.new_trust_ref(
trustor_user_id=self.default_domain_user_id,
trustee_user_id=trustee_user_id,
project_id=self.project_id,
impersonation=False,
expires=dict(minutes=1),
role_ids=[self.role_id])
del ref['id']
auth_data = self.build_authentication_request(
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'],
project_id=self.default_domain_project_id)
r = self.post('/auth/tokens', body=auth_data)
token = r.getheader('X-Subject-Token')
r = self.post('/trusts', body={'trust': ref}, token=token)
trust = self.assertValidTrustResponse(r)
auth_data = self.build_authentication_request(
user_id=trustee_user['id'],
password=trustee_user['password'],
trust_id=trust['id'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectTrustScopedTokenResponse(
r, trustee_user)
token = r.getheader('X-Subject-Token')
# now validate the v3 token with v2 API
path = '/v2.0/tokens/%s' % (token)
resp = self.admin_request(path=path,
token='ADMIN',
method='GET',
expected_status=401)
def test_v3_v2_intermix(self):
# create a trustee in default domain to delegate stuff to
trustee_user_id = uuid.uuid4().hex
trustee_user = self.new_user_ref(domain_id=test_v3.DEFAULT_DOMAIN_ID)
trustee_user['id'] = trustee_user_id
self.identity_api.create_user(trustee_user_id, trustee_user)
ref = self.new_trust_ref(
trustor_user_id=self.default_domain_user_id,
trustee_user_id=trustee_user_id,
project_id=self.default_domain_project_id,
impersonation=False,
expires=dict(minutes=1),
role_ids=[self.role_id])
del ref['id']
auth_data = self.build_authentication_request(
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'],
project_id=self.default_domain_project_id)
r = self.post('/auth/tokens', body=auth_data)
token = r.getheader('X-Subject-Token')
r = self.post('/trusts', body={'trust': ref}, token=token)
trust = self.assertValidTrustResponse(r)
auth_data = self.build_authentication_request(
user_id=trustee_user['id'],
password=trustee_user['password'],
trust_id=trust['id'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectTrustScopedTokenResponse(
r, trustee_user)
token = r.getheader('X-Subject-Token')
# now validate the v3 token with v2 API
path = '/v2.0/tokens/%s' % (token)
resp = self.admin_request(path=path,
token='ADMIN',
method='GET',
expected_status=200)
def test_exercise_trust_scoped_token_without_impersonation(self):
ref = self.new_trust_ref(
trustor_user_id=self.user_id,