diff --git a/keystoneauth1/access/access.py b/keystoneauth1/access/access.py index ffb6b9c8..5dd6a337 100644 --- a/keystoneauth1/access/access.py +++ b/keystoneauth1/access/access.py @@ -379,7 +379,7 @@ class AccessInfoV2(AccessInfo): _service_catalog_class = service_catalog.ServiceCatalogV2 def has_service_catalog(self): - return 'serviceCatalog' in self + return 'serviceCatalog' in self._data.get('access', {}) @_missingproperty def auth_token(self): @@ -396,7 +396,7 @@ class AccessInfoV2(AccessInfo): @_missingproperty def issued(self): - return self._token['issued_at'] + return utils.parse_isotime(self._token['issued_at']) @property def _user(self): @@ -420,7 +420,8 @@ class AccessInfoV2(AccessInfo): @_missingproperty def role_ids(self): - return self.get('metadata', {}).get('roles', []) + metadata = self._data.get('access', {}).get('metadata', {}) + return metadata.get('roles', []) @_missingproperty def role_names(self): @@ -471,7 +472,7 @@ class AccessInfoV2(AccessInfo): def trust_id(self): return self._trust['id'] - @property + @_missingproperty def trust_scoped(self): return bool(self._trust) diff --git a/keystoneauth1/tests/unit/access/test_v2_access.py b/keystoneauth1/tests/unit/access/test_v2_access.py new file mode 100644 index 00000000..78227f6a --- /dev/null +++ b/keystoneauth1/tests/unit/access/test_v2_access.py @@ -0,0 +1,201 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import uuid + +from oslo_utils import timeutils + +from keystoneauth1 import access +from keystoneauth1 import fixture +from keystoneauth1.tests.unit import utils + + +class AccessV2Test(utils.TestCase): + + def test_building_unscoped_accessinfo(self): + token = fixture.V2Token(expires='2012-10-03T16:58:01Z') + + auth_ref = access.create(body=token) + + self.assertIsInstance(auth_ref, access.AccessInfoV2) + self.assertFalse(auth_ref.has_service_catalog()) + + self.assertEqual(auth_ref.auth_token, token.token_id) + self.assertEqual(auth_ref.username, token.user_name) + self.assertEqual(auth_ref.user_id, token.user_id) + + self.assertEqual(auth_ref.role_ids, []) + self.assertEqual(auth_ref.role_names, []) + + self.assertIsNone(auth_ref.tenant_name) + self.assertIsNone(auth_ref.tenant_id) + + self.assertFalse(auth_ref.domain_scoped) + self.assertFalse(auth_ref.project_scoped) + self.assertFalse(auth_ref.trust_scoped) + + self.assertIsNone(auth_ref.project_domain_id) + self.assertIsNone(auth_ref.project_domain_name) + self.assertIsNone(auth_ref.user_domain_id) + self.assertIsNone(auth_ref.user_domain_name) + + self.assertEqual(auth_ref.expires, token.expires) + self.assertEqual(auth_ref.issued, token.issued) + + self.assertEqual(token.audit_id, auth_ref.audit_id) + self.assertIsNone(auth_ref.audit_chain_id) + self.assertIsNone(token.audit_chain_id) + + def test_will_expire_soon(self): + token = fixture.V2Token() + expires = timeutils.utcnow() + datetime.timedelta(minutes=5) + token.expires = expires + auth_ref = access.create(body=token) + self.assertIsInstance(auth_ref, access.AccessInfoV2) + self.assertFalse(auth_ref.will_expire_soon(stale_duration=120)) + self.assertTrue(auth_ref.will_expire_soon(stale_duration=300)) + self.assertFalse(auth_ref.will_expire_soon()) + + def test_building_scoped_accessinfo(self): + token = fixture.V2Token() + token.set_scope() + s = token.add_service('identity') + s.add_endpoint('http://url') + + role_data = token.add_role() + + auth_ref = access.create(body=token) + + self.assertIsInstance(auth_ref, access.AccessInfoV2) + self.assertTrue(auth_ref.has_service_catalog()) + + self.assertEqual(auth_ref.auth_token, token.token_id) + self.assertEqual(auth_ref.username, token.user_name) + self.assertEqual(auth_ref.user_id, token.user_id) + + self.assertEqual(auth_ref.role_ids, [role_data['id']]) + self.assertEqual(auth_ref.role_names, [role_data['name']]) + + self.assertEqual(auth_ref.tenant_name, token.tenant_name) + self.assertEqual(auth_ref.tenant_id, token.tenant_id) + + self.assertEqual(auth_ref.tenant_name, auth_ref.project_name) + self.assertEqual(auth_ref.tenant_id, auth_ref.project_id) + + self.assertIsNone(auth_ref.project_domain_id, 'default') + self.assertIsNone(auth_ref.project_domain_name, 'Default') + self.assertIsNone(auth_ref.user_domain_id, 'default') + self.assertIsNone(auth_ref.user_domain_name, 'Default') + + self.assertTrue(auth_ref.project_scoped) + self.assertFalse(auth_ref.domain_scoped) + + self.assertEqual(token.audit_id, auth_ref.audit_id) + self.assertEqual(token.audit_chain_id, auth_ref.audit_chain_id) + + def test_diablo_token(self): + diablo_token = { + 'access': { + 'token': { + 'id': uuid.uuid4().hex, + 'expires': '2020-01-01T00:00:10.000123Z', + 'tenantId': 'tenant_id1', + }, + 'user': { + 'id': 'user_id1', + 'name': 'user_name1', + 'roles': [ + {'name': 'role1'}, + {'name': 'role2'}, + ], + }, + }, + } + + auth_ref = access.create(body=diablo_token) + self.assertIsInstance(auth_ref, access.AccessInfoV2) + + self.assertTrue(auth_ref) + self.assertEqual(auth_ref.username, 'user_name1') + self.assertEqual(auth_ref.project_id, 'tenant_id1') + self.assertEqual(auth_ref.project_name, 'tenant_id1') + self.assertIsNone(auth_ref.project_domain_id) + self.assertIsNone(auth_ref.project_domain_name) + self.assertIsNone(auth_ref.user_domain_id) + self.assertIsNone(auth_ref.user_domain_name) + self.assertEqual(auth_ref.role_names, ['role1', 'role2']) + + def test_grizzly_token(self): + grizzly_token = { + 'access': { + 'token': { + 'id': uuid.uuid4().hex, + 'expires': '2020-01-01T00:00:10.000123Z', + }, + 'user': { + 'id': 'user_id1', + 'name': 'user_name1', + 'tenantId': 'tenant_id1', + 'tenantName': 'tenant_name1', + 'roles': [ + {'name': 'role1'}, + {'name': 'role2'}, + ], + }, + }, + } + + auth_ref = access.create(body=grizzly_token) + self.assertIsInstance(auth_ref, access.AccessInfoV2) + + self.assertEqual(auth_ref.project_id, 'tenant_id1') + self.assertEqual(auth_ref.project_name, 'tenant_name1') + self.assertIsNone(auth_ref.project_domain_id) + self.assertIsNone(auth_ref.project_domain_name) + self.assertIsNone(auth_ref.user_domain_id, 'default') + self.assertIsNone(auth_ref.user_domain_name, 'Default') + self.assertEqual(auth_ref.role_names, ['role1', 'role2']) + + def test_v2_roles(self): + role_id = 'a' + role_name = 'b' + + token = fixture.V2Token() + token.set_scope() + token.add_role(id=role_id, name=role_name) + + auth_ref = access.create(body=token) + self.assertIsInstance(auth_ref, access.AccessInfoV2) + + self.assertEqual([role_id], auth_ref.role_ids) + self.assertEqual([role_id], + auth_ref._data['access']['metadata']['roles']) + self.assertEqual([role_name], auth_ref.role_names) + self.assertEqual([{'name': role_name}], + auth_ref._data['access']['user']['roles']) + + def test_trusts(self): + user_id = uuid.uuid4().hex + trust_id = uuid.uuid4().hex + + token = fixture.V2Token(user_id=user_id, trust_id=trust_id) + token.set_scope() + token.add_role() + + auth_ref = access.create(body=token) + self.assertIsInstance(auth_ref, access.AccessInfoV2) + + self.assertEqual(trust_id, auth_ref.trust_id) + self.assertEqual(user_id, auth_ref.trustee_user_id) + + self.assertEqual(trust_id, token['access']['trust']['id']) diff --git a/keystoneauth1/tests/unit/access/test_v3_access.py b/keystoneauth1/tests/unit/access/test_v3_access.py new file mode 100644 index 00000000..ebfb997f --- /dev/null +++ b/keystoneauth1/tests/unit/access/test_v3_access.py @@ -0,0 +1,186 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +import datetime +from oslo_utils import timeutils + +from keystoneauth1 import access +from keystoneauth1 import fixture +from keystoneauth1.tests.unit import utils + + +class AccessV3Test(utils.TestCase): + + def test_building_unscoped_accessinfo(self): + token = fixture.V3Token() + token_id = uuid.uuid4().hex + + auth_ref = access.create(body=token, auth_token=token_id) + + self.assertIn('methods', auth_ref._data['token']) + self.assertFalse(auth_ref.has_service_catalog()) + self.assertNotIn('catalog', auth_ref._data['token']) + + self.assertEqual(token_id, auth_ref.auth_token) + self.assertEqual(token.user_name, auth_ref.username) + self.assertEqual(token.user_id, auth_ref.user_id) + + self.assertEqual(auth_ref.role_ids, []) + self.assertEqual(auth_ref.role_names, []) + + self.assertIsNone(auth_ref.project_name) + self.assertIsNone(auth_ref.project_id) + + self.assertFalse(auth_ref.domain_scoped) + self.assertFalse(auth_ref.project_scoped) + + self.assertEqual(token.user_domain_id, auth_ref.user_domain_id) + self.assertEqual(token.user_domain_name, auth_ref.user_domain_name) + + self.assertIsNone(auth_ref.project_domain_id) + self.assertIsNone(auth_ref.project_domain_name) + + self.assertEqual(auth_ref.expires, timeutils.parse_isotime( + token['token']['expires_at'])) + self.assertEqual(auth_ref.issued, timeutils.parse_isotime( + token['token']['issued_at'])) + + self.assertEqual(auth_ref.expires, token.expires) + self.assertEqual(auth_ref.issued, token.issued) + + self.assertEqual(auth_ref.audit_id, token.audit_id) + self.assertIsNone(auth_ref.audit_chain_id) + self.assertIsNone(token.audit_chain_id) + + def test_will_expire_soon(self): + expires = timeutils.utcnow() + datetime.timedelta(minutes=5) + token = fixture.V3Token(expires=expires) + auth_ref = access.create(body=token) + self.assertFalse(auth_ref.will_expire_soon(stale_duration=120)) + self.assertTrue(auth_ref.will_expire_soon(stale_duration=301)) + self.assertFalse(auth_ref.will_expire_soon()) + + def test_building_domain_scoped_accessinfo(self): + token = fixture.V3Token() + token.set_domain_scope() + + s = token.add_service(type='identity') + s.add_standard_endpoints(public='http://url') + + token_id = uuid.uuid4().hex + + auth_ref = access.create(body=token, auth_token=token_id) + + self.assertTrue(auth_ref) + self.assertIn('methods', auth_ref._data['token']) + self.assertIn('catalog', auth_ref._data['token']) + self.assertTrue(auth_ref.has_service_catalog()) + self.assertTrue(auth_ref._data['token']['catalog']) + + self.assertEqual(token_id, auth_ref.auth_token) + self.assertEqual(token.user_name, auth_ref.username) + self.assertEqual(token.user_id, auth_ref.user_id) + + self.assertEqual(token.role_ids, auth_ref.role_ids) + self.assertEqual(token.role_names, auth_ref.role_names) + + self.assertEqual(token.domain_name, auth_ref.domain_name) + self.assertEqual(token.domain_id, auth_ref.domain_id) + + self.assertIsNone(auth_ref.project_name) + self.assertIsNone(auth_ref.project_id) + + self.assertEqual(token.user_domain_id, auth_ref.user_domain_id) + self.assertEqual(token.user_domain_name, auth_ref.user_domain_name) + + self.assertIsNone(auth_ref.project_domain_id) + self.assertIsNone(auth_ref.project_domain_name) + + self.assertTrue(auth_ref.domain_scoped) + self.assertFalse(auth_ref.project_scoped) + + self.assertEqual(token.audit_id, auth_ref.audit_id) + self.assertEqual(token.audit_chain_id, auth_ref.audit_chain_id) + + def test_building_project_scoped_accessinfo(self): + token = fixture.V3Token() + token.set_project_scope() + + s = token.add_service(type='identity') + s.add_standard_endpoints(public='http://url') + + token_id = uuid.uuid4().hex + + auth_ref = access.create(body=token, auth_token=token_id) + + self.assertIn('methods', auth_ref._data['token']) + self.assertIn('catalog', auth_ref._data['token']) + self.assertTrue(auth_ref.has_service_catalog()) + self.assertTrue(auth_ref._data['token']['catalog']) + + self.assertEqual(token_id, auth_ref.auth_token) + self.assertEqual(token.user_name, auth_ref.username) + self.assertEqual(token.user_id, auth_ref.user_id) + + self.assertEqual(token.role_ids, auth_ref.role_ids) + self.assertEqual(token.role_names, auth_ref.role_names) + + self.assertIsNone(auth_ref.domain_name) + self.assertIsNone(auth_ref.domain_id) + + self.assertEqual(token.project_name, auth_ref.project_name) + self.assertEqual(token.project_id, auth_ref.project_id) + + self.assertEqual(auth_ref.tenant_name, auth_ref.project_name) + self.assertEqual(auth_ref.tenant_id, auth_ref.project_id) + + self.assertEqual(token.project_domain_id, auth_ref.project_domain_id) + self.assertEqual(token.project_domain_name, + auth_ref.project_domain_name) + + self.assertEqual(token.user_domain_id, auth_ref.user_domain_id) + self.assertEqual(token.user_domain_name, auth_ref.user_domain_name) + + self.assertFalse(auth_ref.domain_scoped) + self.assertTrue(auth_ref.project_scoped) + + self.assertEqual(token.audit_id, auth_ref.audit_id) + self.assertEqual(token.audit_chain_id, auth_ref.audit_chain_id) + + def test_oauth_access(self): + consumer_id = uuid.uuid4().hex + access_token_id = uuid.uuid4().hex + + token = fixture.V3Token() + token.set_project_scope() + token.set_oauth(access_token_id=access_token_id, + consumer_id=consumer_id) + + auth_ref = access.create(body=token) + + self.assertEqual(consumer_id, auth_ref.oauth_consumer_id) + self.assertEqual(access_token_id, auth_ref.oauth_access_token_id) + + self.assertEqual(consumer_id, + auth_ref._data['token']['OS-OAUTH1']['consumer_id']) + self.assertEqual( + access_token_id, + auth_ref._data['token']['OS-OAUTH1']['access_token_id']) + + def test_federated_property_standard_token(self): + """Check if is_federated property returns expected value.""" + token = fixture.V3Token() + token.set_project_scope() + auth_ref = access.create(body=token) + self.assertFalse(auth_ref.is_federated)