Merge "Copy AccessInfo tests from keystoneclient"
This commit is contained in:
commit
8204cfe350
@ -379,7 +379,7 @@ class AccessInfoV2(AccessInfo):
|
||||
_service_catalog_class = service_catalog.ServiceCatalogV2
|
||||
|
||||
def has_service_catalog(self):
|
||||
return 'serviceCatalog' in self
|
||||
return 'serviceCatalog' in self._data.get('access', {})
|
||||
|
||||
@_missingproperty
|
||||
def auth_token(self):
|
||||
@ -396,7 +396,7 @@ class AccessInfoV2(AccessInfo):
|
||||
|
||||
@_missingproperty
|
||||
def issued(self):
|
||||
return self._token['issued_at']
|
||||
return utils.parse_isotime(self._token['issued_at'])
|
||||
|
||||
@property
|
||||
def _user(self):
|
||||
@ -420,7 +420,8 @@ class AccessInfoV2(AccessInfo):
|
||||
|
||||
@_missingproperty
|
||||
def role_ids(self):
|
||||
return self.get('metadata', {}).get('roles', [])
|
||||
metadata = self._data.get('access', {}).get('metadata', {})
|
||||
return metadata.get('roles', [])
|
||||
|
||||
@_missingproperty
|
||||
def role_names(self):
|
||||
@ -471,7 +472,7 @@ class AccessInfoV2(AccessInfo):
|
||||
def trust_id(self):
|
||||
return self._trust['id']
|
||||
|
||||
@property
|
||||
@_missingproperty
|
||||
def trust_scoped(self):
|
||||
return bool(self._trust)
|
||||
|
||||
|
201
keystoneauth1/tests/unit/access/test_v2_access.py
Normal file
201
keystoneauth1/tests/unit/access/test_v2_access.py
Normal file
@ -0,0 +1,201 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from keystoneauth1 import access
|
||||
from keystoneauth1 import fixture
|
||||
from keystoneauth1.tests.unit import utils
|
||||
|
||||
|
||||
class AccessV2Test(utils.TestCase):
|
||||
|
||||
def test_building_unscoped_accessinfo(self):
|
||||
token = fixture.V2Token(expires='2012-10-03T16:58:01Z')
|
||||
|
||||
auth_ref = access.create(body=token)
|
||||
|
||||
self.assertIsInstance(auth_ref, access.AccessInfoV2)
|
||||
self.assertFalse(auth_ref.has_service_catalog())
|
||||
|
||||
self.assertEqual(auth_ref.auth_token, token.token_id)
|
||||
self.assertEqual(auth_ref.username, token.user_name)
|
||||
self.assertEqual(auth_ref.user_id, token.user_id)
|
||||
|
||||
self.assertEqual(auth_ref.role_ids, [])
|
||||
self.assertEqual(auth_ref.role_names, [])
|
||||
|
||||
self.assertIsNone(auth_ref.tenant_name)
|
||||
self.assertIsNone(auth_ref.tenant_id)
|
||||
|
||||
self.assertFalse(auth_ref.domain_scoped)
|
||||
self.assertFalse(auth_ref.project_scoped)
|
||||
self.assertFalse(auth_ref.trust_scoped)
|
||||
|
||||
self.assertIsNone(auth_ref.project_domain_id)
|
||||
self.assertIsNone(auth_ref.project_domain_name)
|
||||
self.assertIsNone(auth_ref.user_domain_id)
|
||||
self.assertIsNone(auth_ref.user_domain_name)
|
||||
|
||||
self.assertEqual(auth_ref.expires, token.expires)
|
||||
self.assertEqual(auth_ref.issued, token.issued)
|
||||
|
||||
self.assertEqual(token.audit_id, auth_ref.audit_id)
|
||||
self.assertIsNone(auth_ref.audit_chain_id)
|
||||
self.assertIsNone(token.audit_chain_id)
|
||||
|
||||
def test_will_expire_soon(self):
|
||||
token = fixture.V2Token()
|
||||
expires = timeutils.utcnow() + datetime.timedelta(minutes=5)
|
||||
token.expires = expires
|
||||
auth_ref = access.create(body=token)
|
||||
self.assertIsInstance(auth_ref, access.AccessInfoV2)
|
||||
self.assertFalse(auth_ref.will_expire_soon(stale_duration=120))
|
||||
self.assertTrue(auth_ref.will_expire_soon(stale_duration=300))
|
||||
self.assertFalse(auth_ref.will_expire_soon())
|
||||
|
||||
def test_building_scoped_accessinfo(self):
|
||||
token = fixture.V2Token()
|
||||
token.set_scope()
|
||||
s = token.add_service('identity')
|
||||
s.add_endpoint('http://url')
|
||||
|
||||
role_data = token.add_role()
|
||||
|
||||
auth_ref = access.create(body=token)
|
||||
|
||||
self.assertIsInstance(auth_ref, access.AccessInfoV2)
|
||||
self.assertTrue(auth_ref.has_service_catalog())
|
||||
|
||||
self.assertEqual(auth_ref.auth_token, token.token_id)
|
||||
self.assertEqual(auth_ref.username, token.user_name)
|
||||
self.assertEqual(auth_ref.user_id, token.user_id)
|
||||
|
||||
self.assertEqual(auth_ref.role_ids, [role_data['id']])
|
||||
self.assertEqual(auth_ref.role_names, [role_data['name']])
|
||||
|
||||
self.assertEqual(auth_ref.tenant_name, token.tenant_name)
|
||||
self.assertEqual(auth_ref.tenant_id, token.tenant_id)
|
||||
|
||||
self.assertEqual(auth_ref.tenant_name, auth_ref.project_name)
|
||||
self.assertEqual(auth_ref.tenant_id, auth_ref.project_id)
|
||||
|
||||
self.assertIsNone(auth_ref.project_domain_id, 'default')
|
||||
self.assertIsNone(auth_ref.project_domain_name, 'Default')
|
||||
self.assertIsNone(auth_ref.user_domain_id, 'default')
|
||||
self.assertIsNone(auth_ref.user_domain_name, 'Default')
|
||||
|
||||
self.assertTrue(auth_ref.project_scoped)
|
||||
self.assertFalse(auth_ref.domain_scoped)
|
||||
|
||||
self.assertEqual(token.audit_id, auth_ref.audit_id)
|
||||
self.assertEqual(token.audit_chain_id, auth_ref.audit_chain_id)
|
||||
|
||||
def test_diablo_token(self):
|
||||
diablo_token = {
|
||||
'access': {
|
||||
'token': {
|
||||
'id': uuid.uuid4().hex,
|
||||
'expires': '2020-01-01T00:00:10.000123Z',
|
||||
'tenantId': 'tenant_id1',
|
||||
},
|
||||
'user': {
|
||||
'id': 'user_id1',
|
||||
'name': 'user_name1',
|
||||
'roles': [
|
||||
{'name': 'role1'},
|
||||
{'name': 'role2'},
|
||||
],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
auth_ref = access.create(body=diablo_token)
|
||||
self.assertIsInstance(auth_ref, access.AccessInfoV2)
|
||||
|
||||
self.assertTrue(auth_ref)
|
||||
self.assertEqual(auth_ref.username, 'user_name1')
|
||||
self.assertEqual(auth_ref.project_id, 'tenant_id1')
|
||||
self.assertEqual(auth_ref.project_name, 'tenant_id1')
|
||||
self.assertIsNone(auth_ref.project_domain_id)
|
||||
self.assertIsNone(auth_ref.project_domain_name)
|
||||
self.assertIsNone(auth_ref.user_domain_id)
|
||||
self.assertIsNone(auth_ref.user_domain_name)
|
||||
self.assertEqual(auth_ref.role_names, ['role1', 'role2'])
|
||||
|
||||
def test_grizzly_token(self):
|
||||
grizzly_token = {
|
||||
'access': {
|
||||
'token': {
|
||||
'id': uuid.uuid4().hex,
|
||||
'expires': '2020-01-01T00:00:10.000123Z',
|
||||
},
|
||||
'user': {
|
||||
'id': 'user_id1',
|
||||
'name': 'user_name1',
|
||||
'tenantId': 'tenant_id1',
|
||||
'tenantName': 'tenant_name1',
|
||||
'roles': [
|
||||
{'name': 'role1'},
|
||||
{'name': 'role2'},
|
||||
],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
auth_ref = access.create(body=grizzly_token)
|
||||
self.assertIsInstance(auth_ref, access.AccessInfoV2)
|
||||
|
||||
self.assertEqual(auth_ref.project_id, 'tenant_id1')
|
||||
self.assertEqual(auth_ref.project_name, 'tenant_name1')
|
||||
self.assertIsNone(auth_ref.project_domain_id)
|
||||
self.assertIsNone(auth_ref.project_domain_name)
|
||||
self.assertIsNone(auth_ref.user_domain_id, 'default')
|
||||
self.assertIsNone(auth_ref.user_domain_name, 'Default')
|
||||
self.assertEqual(auth_ref.role_names, ['role1', 'role2'])
|
||||
|
||||
def test_v2_roles(self):
|
||||
role_id = 'a'
|
||||
role_name = 'b'
|
||||
|
||||
token = fixture.V2Token()
|
||||
token.set_scope()
|
||||
token.add_role(id=role_id, name=role_name)
|
||||
|
||||
auth_ref = access.create(body=token)
|
||||
self.assertIsInstance(auth_ref, access.AccessInfoV2)
|
||||
|
||||
self.assertEqual([role_id], auth_ref.role_ids)
|
||||
self.assertEqual([role_id],
|
||||
auth_ref._data['access']['metadata']['roles'])
|
||||
self.assertEqual([role_name], auth_ref.role_names)
|
||||
self.assertEqual([{'name': role_name}],
|
||||
auth_ref._data['access']['user']['roles'])
|
||||
|
||||
def test_trusts(self):
|
||||
user_id = uuid.uuid4().hex
|
||||
trust_id = uuid.uuid4().hex
|
||||
|
||||
token = fixture.V2Token(user_id=user_id, trust_id=trust_id)
|
||||
token.set_scope()
|
||||
token.add_role()
|
||||
|
||||
auth_ref = access.create(body=token)
|
||||
self.assertIsInstance(auth_ref, access.AccessInfoV2)
|
||||
|
||||
self.assertEqual(trust_id, auth_ref.trust_id)
|
||||
self.assertEqual(user_id, auth_ref.trustee_user_id)
|
||||
|
||||
self.assertEqual(trust_id, token['access']['trust']['id'])
|
186
keystoneauth1/tests/unit/access/test_v3_access.py
Normal file
186
keystoneauth1/tests/unit/access/test_v3_access.py
Normal file
@ -0,0 +1,186 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import uuid
|
||||
|
||||
import datetime
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from keystoneauth1 import access
|
||||
from keystoneauth1 import fixture
|
||||
from keystoneauth1.tests.unit import utils
|
||||
|
||||
|
||||
class AccessV3Test(utils.TestCase):
|
||||
|
||||
def test_building_unscoped_accessinfo(self):
|
||||
token = fixture.V3Token()
|
||||
token_id = uuid.uuid4().hex
|
||||
|
||||
auth_ref = access.create(body=token, auth_token=token_id)
|
||||
|
||||
self.assertIn('methods', auth_ref._data['token'])
|
||||
self.assertFalse(auth_ref.has_service_catalog())
|
||||
self.assertNotIn('catalog', auth_ref._data['token'])
|
||||
|
||||
self.assertEqual(token_id, auth_ref.auth_token)
|
||||
self.assertEqual(token.user_name, auth_ref.username)
|
||||
self.assertEqual(token.user_id, auth_ref.user_id)
|
||||
|
||||
self.assertEqual(auth_ref.role_ids, [])
|
||||
self.assertEqual(auth_ref.role_names, [])
|
||||
|
||||
self.assertIsNone(auth_ref.project_name)
|
||||
self.assertIsNone(auth_ref.project_id)
|
||||
|
||||
self.assertFalse(auth_ref.domain_scoped)
|
||||
self.assertFalse(auth_ref.project_scoped)
|
||||
|
||||
self.assertEqual(token.user_domain_id, auth_ref.user_domain_id)
|
||||
self.assertEqual(token.user_domain_name, auth_ref.user_domain_name)
|
||||
|
||||
self.assertIsNone(auth_ref.project_domain_id)
|
||||
self.assertIsNone(auth_ref.project_domain_name)
|
||||
|
||||
self.assertEqual(auth_ref.expires, timeutils.parse_isotime(
|
||||
token['token']['expires_at']))
|
||||
self.assertEqual(auth_ref.issued, timeutils.parse_isotime(
|
||||
token['token']['issued_at']))
|
||||
|
||||
self.assertEqual(auth_ref.expires, token.expires)
|
||||
self.assertEqual(auth_ref.issued, token.issued)
|
||||
|
||||
self.assertEqual(auth_ref.audit_id, token.audit_id)
|
||||
self.assertIsNone(auth_ref.audit_chain_id)
|
||||
self.assertIsNone(token.audit_chain_id)
|
||||
|
||||
def test_will_expire_soon(self):
|
||||
expires = timeutils.utcnow() + datetime.timedelta(minutes=5)
|
||||
token = fixture.V3Token(expires=expires)
|
||||
auth_ref = access.create(body=token)
|
||||
self.assertFalse(auth_ref.will_expire_soon(stale_duration=120))
|
||||
self.assertTrue(auth_ref.will_expire_soon(stale_duration=301))
|
||||
self.assertFalse(auth_ref.will_expire_soon())
|
||||
|
||||
def test_building_domain_scoped_accessinfo(self):
|
||||
token = fixture.V3Token()
|
||||
token.set_domain_scope()
|
||||
|
||||
s = token.add_service(type='identity')
|
||||
s.add_standard_endpoints(public='http://url')
|
||||
|
||||
token_id = uuid.uuid4().hex
|
||||
|
||||
auth_ref = access.create(body=token, auth_token=token_id)
|
||||
|
||||
self.assertTrue(auth_ref)
|
||||
self.assertIn('methods', auth_ref._data['token'])
|
||||
self.assertIn('catalog', auth_ref._data['token'])
|
||||
self.assertTrue(auth_ref.has_service_catalog())
|
||||
self.assertTrue(auth_ref._data['token']['catalog'])
|
||||
|
||||
self.assertEqual(token_id, auth_ref.auth_token)
|
||||
self.assertEqual(token.user_name, auth_ref.username)
|
||||
self.assertEqual(token.user_id, auth_ref.user_id)
|
||||
|
||||
self.assertEqual(token.role_ids, auth_ref.role_ids)
|
||||
self.assertEqual(token.role_names, auth_ref.role_names)
|
||||
|
||||
self.assertEqual(token.domain_name, auth_ref.domain_name)
|
||||
self.assertEqual(token.domain_id, auth_ref.domain_id)
|
||||
|
||||
self.assertIsNone(auth_ref.project_name)
|
||||
self.assertIsNone(auth_ref.project_id)
|
||||
|
||||
self.assertEqual(token.user_domain_id, auth_ref.user_domain_id)
|
||||
self.assertEqual(token.user_domain_name, auth_ref.user_domain_name)
|
||||
|
||||
self.assertIsNone(auth_ref.project_domain_id)
|
||||
self.assertIsNone(auth_ref.project_domain_name)
|
||||
|
||||
self.assertTrue(auth_ref.domain_scoped)
|
||||
self.assertFalse(auth_ref.project_scoped)
|
||||
|
||||
self.assertEqual(token.audit_id, auth_ref.audit_id)
|
||||
self.assertEqual(token.audit_chain_id, auth_ref.audit_chain_id)
|
||||
|
||||
def test_building_project_scoped_accessinfo(self):
|
||||
token = fixture.V3Token()
|
||||
token.set_project_scope()
|
||||
|
||||
s = token.add_service(type='identity')
|
||||
s.add_standard_endpoints(public='http://url')
|
||||
|
||||
token_id = uuid.uuid4().hex
|
||||
|
||||
auth_ref = access.create(body=token, auth_token=token_id)
|
||||
|
||||
self.assertIn('methods', auth_ref._data['token'])
|
||||
self.assertIn('catalog', auth_ref._data['token'])
|
||||
self.assertTrue(auth_ref.has_service_catalog())
|
||||
self.assertTrue(auth_ref._data['token']['catalog'])
|
||||
|
||||
self.assertEqual(token_id, auth_ref.auth_token)
|
||||
self.assertEqual(token.user_name, auth_ref.username)
|
||||
self.assertEqual(token.user_id, auth_ref.user_id)
|
||||
|
||||
self.assertEqual(token.role_ids, auth_ref.role_ids)
|
||||
self.assertEqual(token.role_names, auth_ref.role_names)
|
||||
|
||||
self.assertIsNone(auth_ref.domain_name)
|
||||
self.assertIsNone(auth_ref.domain_id)
|
||||
|
||||
self.assertEqual(token.project_name, auth_ref.project_name)
|
||||
self.assertEqual(token.project_id, auth_ref.project_id)
|
||||
|
||||
self.assertEqual(auth_ref.tenant_name, auth_ref.project_name)
|
||||
self.assertEqual(auth_ref.tenant_id, auth_ref.project_id)
|
||||
|
||||
self.assertEqual(token.project_domain_id, auth_ref.project_domain_id)
|
||||
self.assertEqual(token.project_domain_name,
|
||||
auth_ref.project_domain_name)
|
||||
|
||||
self.assertEqual(token.user_domain_id, auth_ref.user_domain_id)
|
||||
self.assertEqual(token.user_domain_name, auth_ref.user_domain_name)
|
||||
|
||||
self.assertFalse(auth_ref.domain_scoped)
|
||||
self.assertTrue(auth_ref.project_scoped)
|
||||
|
||||
self.assertEqual(token.audit_id, auth_ref.audit_id)
|
||||
self.assertEqual(token.audit_chain_id, auth_ref.audit_chain_id)
|
||||
|
||||
def test_oauth_access(self):
|
||||
consumer_id = uuid.uuid4().hex
|
||||
access_token_id = uuid.uuid4().hex
|
||||
|
||||
token = fixture.V3Token()
|
||||
token.set_project_scope()
|
||||
token.set_oauth(access_token_id=access_token_id,
|
||||
consumer_id=consumer_id)
|
||||
|
||||
auth_ref = access.create(body=token)
|
||||
|
||||
self.assertEqual(consumer_id, auth_ref.oauth_consumer_id)
|
||||
self.assertEqual(access_token_id, auth_ref.oauth_access_token_id)
|
||||
|
||||
self.assertEqual(consumer_id,
|
||||
auth_ref._data['token']['OS-OAUTH1']['consumer_id'])
|
||||
self.assertEqual(
|
||||
access_token_id,
|
||||
auth_ref._data['token']['OS-OAUTH1']['access_token_id'])
|
||||
|
||||
def test_federated_property_standard_token(self):
|
||||
"""Check if is_federated property returns expected value."""
|
||||
token = fixture.V3Token()
|
||||
token.set_project_scope()
|
||||
auth_ref = access.create(body=token)
|
||||
self.assertFalse(auth_ref.is_federated)
|
Loading…
Reference in New Issue
Block a user