Add trust users to AccessInfo and fixture
A trust should always contain a trustee_user_id and a trustor_user_id. Expose these values via AccessInfo if available. Change-Id: Ic46a44300e6bf8aa694f1543d470c16fcac643fc
This commit is contained in:
@@ -263,6 +263,22 @@ class AccessInfo(dict):
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def trustee_user_id(self):
|
||||
"""Returns the trustee user id associated with a trust.
|
||||
|
||||
:returns: str or None (if no trust associated with the token)
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def trustor_user_id(self):
|
||||
"""Returns the trustor user id associated with a trust.
|
||||
|
||||
:returns: str or None (if no trust associated with the token)
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def project_id(self):
|
||||
"""Returns the project ID associated with the authentication
|
||||
@@ -468,6 +484,15 @@ class AccessInfoV2(AccessInfo):
|
||||
def trust_scoped(self):
|
||||
return 'trust' in self
|
||||
|
||||
@property
|
||||
def trustee_user_id(self):
|
||||
return self.get('trust', {}).get('trustee_user_id')
|
||||
|
||||
@property
|
||||
def trustor_user_id(self):
|
||||
# this information is not available in the v2 token bug: #1331882
|
||||
return None
|
||||
|
||||
@property
|
||||
def project_id(self):
|
||||
try:
|
||||
@@ -649,6 +674,14 @@ class AccessInfoV3(AccessInfo):
|
||||
def trust_scoped(self):
|
||||
return 'OS-TRUST:trust' in self
|
||||
|
||||
@property
|
||||
def trustee_user_id(self):
|
||||
return self.get('OS-TRUST:trust', {}).get('trustee_user', {}).get('id')
|
||||
|
||||
@property
|
||||
def trustor_user_id(self):
|
||||
return self.get('OS-TRUST:trust', {}).get('trustor_user', {}).get('id')
|
||||
|
||||
@property
|
||||
def auth_url(self):
|
||||
# FIXME(jamielennox): this is deprecated in favour of retrieving it
|
||||
|
@@ -42,7 +42,7 @@ class Token(dict):
|
||||
|
||||
def __init__(self, token_id=None, expires=None, issued=None,
|
||||
tenant_id=None, tenant_name=None, user_id=None,
|
||||
user_name=None):
|
||||
user_name=None, trust_id=None, trustee_user_id=None):
|
||||
super(Token, self).__init__()
|
||||
|
||||
self.token_id = token_id or uuid.uuid4().hex
|
||||
@@ -69,6 +69,12 @@ class Token(dict):
|
||||
if tenant_id or tenant_name:
|
||||
self.set_scope(tenant_id, tenant_name)
|
||||
|
||||
if trust_id or trustee_user_id:
|
||||
# the trustee_user_id will generally be the same as the user_id as
|
||||
# the token is being issued to the trustee
|
||||
self.set_trust(id=trust_id,
|
||||
trustee_user_id=trustee_user_id or user_id)
|
||||
|
||||
@property
|
||||
def root(self):
|
||||
return self.setdefault('access', {})
|
||||
@@ -157,6 +163,22 @@ class Token(dict):
|
||||
def _metadata(self):
|
||||
return self.root.setdefault('metadata', {})
|
||||
|
||||
@property
|
||||
def trust_id(self):
|
||||
return self.root.setdefault('trust', {})('id')
|
||||
|
||||
@trust_id.setter
|
||||
def trust_id(self, value):
|
||||
self.root.setdefault('trust', {})['id'] = value
|
||||
|
||||
@property
|
||||
def trustee_user_id(self):
|
||||
return self.root.setdefault('trust', {}).get('trustee_user_id')
|
||||
|
||||
@trustee_user_id.setter
|
||||
def trustee_user_id(self, value):
|
||||
self.root.setdefault('trust', {})['trustee_user_id'] = value
|
||||
|
||||
def validate(self):
|
||||
scoped = 'tenant' in self.token
|
||||
catalog = self.root.get('serviceCatalog')
|
||||
@@ -186,3 +208,7 @@ class Token(dict):
|
||||
def set_scope(self, id=None, name=None):
|
||||
self.tenant_id = id or uuid.uuid4().hex
|
||||
self.tenant_name = name or uuid.uuid4().hex
|
||||
|
||||
def set_trust(self, id=None, trustee_user_id=None):
|
||||
self.trust_id = id or uuid.uuid4().hex
|
||||
self.trustee_user_id = trustee_user_id or uuid.uuid4().hex
|
||||
|
@@ -11,6 +11,7 @@
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
import testresources
|
||||
|
||||
@@ -149,6 +150,21 @@ class AccessInfoTest(utils.TestCase, testresources.ResourcedTestCase):
|
||||
self.assertEqual([role_name], auth_ref.role_names)
|
||||
self.assertEqual([{'name': role_name}], auth_ref['user']['roles'])
|
||||
|
||||
def test_trusts(self):
|
||||
user_id = uuid.uuid4().hex
|
||||
trust_id = uuid.uuid4().hex
|
||||
|
||||
token = fixture.V2Token(user_id=user_id, trust_id=trust_id)
|
||||
token.set_scope()
|
||||
token.add_role()
|
||||
|
||||
auth_ref = access.AccessInfo.factory(body=token)
|
||||
|
||||
self.assertEqual(trust_id, auth_ref.trust_id)
|
||||
self.assertEqual(user_id, auth_ref.trustee_user_id)
|
||||
|
||||
self.assertEqual(trust_id, token['access']['trust']['id'])
|
||||
|
||||
|
||||
def load_tests(loader, tests, pattern):
|
||||
return testresources.OptimisingTestSuite(tests)
|
||||
|
@@ -213,6 +213,7 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase):
|
||||
auth_url=self.TEST_URL)
|
||||
self.assertTrue(cs.auth_ref.trust_scoped)
|
||||
self.assertEqual(cs.auth_ref.trust_id, self.TEST_TRUST_ID)
|
||||
self.assertEqual(cs.auth_ref.trustee_user_id, self.TEST_USER)
|
||||
self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY)
|
||||
|
||||
@httpretty.activate
|
||||
|
@@ -125,6 +125,8 @@ class KeystoneClientTest(utils.TestCase):
|
||||
self.assertFalse(c.auth_ref.domain_scoped)
|
||||
self.assertFalse(c.auth_ref.project_scoped)
|
||||
self.assertEqual(c.auth_ref.trust_id, 'fe0aef')
|
||||
self.assertEqual(c.auth_ref.trustee_user_id, '0ca8f6')
|
||||
self.assertEqual(c.auth_ref.trustor_user_id, 'bd263c')
|
||||
self.assertTrue(c.auth_ref.trust_scoped)
|
||||
self.assertEqual(c.auth_user_id, '0ca8f6')
|
||||
|
||||
|
Reference in New Issue
Block a user