diff --git a/keystone/tests/unit/core.py b/keystone/tests/unit/core.py index b47607cab0..93e3cae2fb 100644 --- a/keystone/tests/unit/core.py +++ b/keystone/tests/unit/core.py @@ -424,7 +424,8 @@ def new_policy_ref(**kwargs): def new_trust_ref(trustor_user_id, trustee_user_id, project_id=None, impersonation=None, expires=None, role_ids=None, role_names=None, remaining_uses=None, - allow_redelegation=False, **kwargs): + allow_redelegation=False, redelegation_count=None, + redelegated_trust_id=None, **kwargs): ref = { 'id': uuid.uuid4().hex, 'trustor_user_id': trustor_user_id, @@ -433,8 +434,12 @@ def new_trust_ref(trustor_user_id, trustee_user_id, project_id=None, 'project_id': project_id, 'remaining_uses': remaining_uses, 'allow_redelegation': allow_redelegation, + 'redelegated_trust_id': redelegated_trust_id, } + if isinstance(redelegation_count, int): + ref.update(redelegation_count=redelegation_count) + if isinstance(expires, six.string_types): ref['expires_at'] = expires elif isinstance(expires, dict): diff --git a/keystone/tests/unit/test_v3_auth.py b/keystone/tests/unit/test_v3_auth.py index 0467a1fa23..0899f4b6b8 100644 --- a/keystone/tests/unit/test_v3_auth.py +++ b/keystone/tests/unit/test_v3_auth.py @@ -3165,27 +3165,43 @@ class TestTrustChain(test_v3.RestfulTestCase): def setUp(self): super(TestTrustChain, self).setUp() - # Create trust chain - self.user_chain = list() + """Create a trust chain using redelegation. + + A trust chain is a series of trusts that are redelegated. For example, + self.user_list consists of userA, userB, and userC. The first trust in + the trust chain is going to be established between self.user and userA, + call it trustA. Then, userA is going to obtain a trust scoped token + using trustA, and with that token create a trust between userA and + userB called trustB. This pattern will continue with userB creating a + trust with userC. + So the trust chain should look something like: + trustA -> trustB -> trustC + Where: + self.user is trusting userA with trustA + userA is trusting userB with trustB + userB is trusting userC with trustC + + """ + self.user_list = list() self.trust_chain = list() for _ in range(3): user = unit.create_user(self.identity_api, domain_id=self.domain_id) - self.user_chain.append(user) + self.user_list.append(user) - # trustor->trustee - trustee = self.user_chain[0] + # trustor->trustee redelegation without impersonation + trustee = self.user_list[0] trust_ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=trustee['id'], project_id=self.project_id, - impersonation=True, + impersonation=False, expires=dict(minutes=1), - role_ids=[self.role_id]) - trust_ref.update( + role_ids=[self.role_id], allow_redelegation=True, redelegation_count=3) + # Create a trust between self.user and the first user in the list r = self.post('/OS-TRUST/trusts', body={'trust': trust_ref}) @@ -3194,30 +3210,39 @@ class TestTrustChain(test_v3.RestfulTestCase): user_id=trustee['id'], password=trustee['password'], trust_id=trust['id']) + + # Generate a trusted token for the first user trust_token = self.get_requested_token(auth_data) self.trust_chain.append(trust) - for trustee in self.user_chain[1:]: + # Set trustor and redelegated_trust_id for next trust in the chain + next_trustor_id = trustee['id'] + redelegated_trust_id = trust['id'] + + # Loop through the user to create a chain of redelegated trust. + for next_trustee in self.user_list[1:]: trust_ref = unit.new_trust_ref( - trustor_user_id=self.user_id, - trustee_user_id=trustee['id'], + trustor_user_id=next_trustor_id, + trustee_user_id=next_trustee['id'], project_id=self.project_id, - impersonation=True, - role_ids=[self.role_id]) - trust_ref.update( - allow_redelegation=True) + impersonation=False, + role_ids=[self.role_id], + allow_redelegation=True, + redelegated_trust_id=redelegated_trust_id) r = self.post('/OS-TRUST/trusts', body={'trust': trust_ref}, token=trust_token) trust = self.assertValidTrustResponse(r) auth_data = self.build_authentication_request( - user_id=trustee['id'], - password=trustee['password'], + user_id=next_trustee['id'], + password=next_trustee['password'], trust_id=trust['id']) trust_token = self.get_requested_token(auth_data) self.trust_chain.append(trust) + next_trustor_id = next_trustee['id'] + redelegated_trust_id = trust['id'] - trustee = self.user_chain[-1] + trustee = self.user_list[-1] trust = self.trust_chain[-1] auth_data = self.build_authentication_request( user_id=trustee['id'], @@ -3235,7 +3260,7 @@ class TestTrustChain(test_v3.RestfulTestCase): self.assertValidTokenResponse(r) def assert_trust_tokens_revoked(self, trust_id): - trustee = self.user_chain[0] + trustee = self.user_list[0] auth_data = self.build_authentication_request( user_id=trustee['id'], password=trustee['password'] @@ -3253,7 +3278,7 @@ class TestTrustChain(test_v3.RestfulTestCase): trust_id) def test_delete_trust_cascade(self): - self.assert_user_authenticate(self.user_chain[0]) + self.assert_user_authenticate(self.user_list[0]) self.delete('/OS-TRUST/trusts/%(trust_id)s' % { 'trust_id': self.trust_chain[0]['id']}) @@ -3263,30 +3288,51 @@ class TestTrustChain(test_v3.RestfulTestCase): self.assert_trust_tokens_revoked(self.trust_chain[0]['id']) def test_delete_broken_chain(self): - self.assert_user_authenticate(self.user_chain[0]) - self.delete('/OS-TRUST/trusts/%(trust_id)s' % { - 'trust_id': self.trust_chain[1]['id']}) - + self.assert_user_authenticate(self.user_list[0]) self.delete('/OS-TRUST/trusts/%(trust_id)s' % { 'trust_id': self.trust_chain[0]['id']}) + # Verify the two remaining trust have been deleted + for i in xrange(len(self.user_list) - 1): + auth_data = self.build_authentication_request( + user_id=self.user_list[i]['id'], + password=self.user_list[i]['password']) + + auth_token = self.get_requested_token(auth_data) + + self.delete('/OS-TRUST/trusts/%(trust_id)s' % { + 'trust_id': self.trust_chain[i + 1]['id']}, + token=auth_token, + expected_status=http_client.NOT_FOUND) + def test_trustor_roles_revoked(self): - self.assert_user_authenticate(self.user_chain[0]) + self.assert_user_authenticate(self.user_list[0]) self.assignment_api.remove_role_from_user_and_project( self.user_id, self.project_id, self.role_id ) - auth_data = self.build_authentication_request( - token=self.last_token, - trust_id=self.trust_chain[-1]['id']) - self.v3_create_token(auth_data, - expected_status=http_client.NOT_FOUND) + # Verify that users are not allowed to authenticate with trust + for i in xrange(len(self.user_list[1:])): + trustee = self.user_list[i] + auth_data = self.build_authentication_request( + user_id=trustee['id'], + password=trustee['password']) + + # Attempt to authenticate with trust + token = self.get_requested_token(auth_data) + auth_data = self.build_authentication_request( + token=token, + trust_id=self.trust_chain[i - 1]['id']) + + # Trustee has no delegated roles + self.v3_create_token(auth_data, + expected_status=http_client.FORBIDDEN) def test_intermediate_user_disabled(self): - self.assert_user_authenticate(self.user_chain[0]) + self.assert_user_authenticate(self.user_list[0]) - disabled = self.user_chain[0] + disabled = self.user_list[0] disabled['enabled'] = False self.identity_api.update_user(disabled['id'], disabled) @@ -3297,9 +3343,9 @@ class TestTrustChain(test_v3.RestfulTestCase): expected_status=http_client.FORBIDDEN) def test_intermediate_user_deleted(self): - self.assert_user_authenticate(self.user_chain[0]) + self.assert_user_authenticate(self.user_list[0]) - self.identity_api.delete_user(self.user_chain[0]['id']) + self.identity_api.delete_user(self.user_list[0]['id']) # Bypass policy enforcement with mock.patch.object(rules, 'enforce', return_value=True): diff --git a/keystone/token/providers/common.py b/keystone/token/providers/common.py index e78a2c68ae..06f8f5ec61 100644 --- a/keystone/token/providers/common.py +++ b/keystone/token/providers/common.py @@ -357,7 +357,16 @@ class V3TokenDataHelper(object): return if CONF.trust.enabled and trust: - token_user_id = trust['trustor_user_id'] + # If redelegated_trust_id is set, then we must traverse the + # trust_chain in order to determine who the original trustor is. We + # need to do this because the user ID of the original trustor helps + # us determine scope in the redelegated context. + if trust.get('redelegated_trust_id'): + trust_chain = self.trust_api.get_trust_pedigree(trust['id']) + token_user_id = trust_chain[-1]['trustor_user_id'] + else: + token_user_id = trust['trustor_user_id'] + token_project_id = trust['project_id'] # trusts do not support domains yet token_domain_id = None diff --git a/keystone/trust/controllers.py b/keystone/trust/controllers.py index d91894157c..748a2f7c34 100644 --- a/keystone/trust/controllers.py +++ b/keystone/trust/controllers.py @@ -172,17 +172,23 @@ class TrustV3(controller.V3Controller): raise exception.Forbidden( _('At least one role should be specified.')) - def _get_user_role(self, trust): + def _get_trustor_roles(self, trust): + original_trust = trust.copy() + while original_trust.get('redelegated_trust_id'): + original_trust = self.trust_api.get_trust( + original_trust['redelegated_trust_id']) + if not self._attribute_is_empty(trust, 'project_id'): return self.assignment_api.get_roles_for_user_and_project( - trust['trustor_user_id'], trust['project_id']) + original_trust['trustor_user_id'], + original_trust['project_id']) else: return [] def _require_trustor_has_role_in_project(self, trust): - user_roles = self._get_user_role(trust) + trustor_roles = self._get_trustor_roles(trust) for trust_role in trust['roles']: - matching_roles = [x for x in user_roles + matching_roles = [x for x in trustor_roles if x == trust_role['id']] if not matching_roles: raise exception.RoleNotFound(role_id=trust_role['id']) diff --git a/keystone/trust/core.py b/keystone/trust/core.py index 0bfcfaea0e..76a2da6897 100644 --- a/keystone/trust/core.py +++ b/keystone/trust/core.py @@ -90,14 +90,9 @@ class Manager(manager.Manager): def get_trust_pedigree(self, trust_id): trust = self.driver.get_trust(trust_id) trust_chain = [trust] - if trust and trust.get('redelegated_trust_id'): - trusts = self.driver.list_trusts_for_trustor( - trust['trustor_user_id']) - while trust_chain[-1].get('redelegated_trust_id'): - for t in trusts: - if t['id'] == trust_chain[-1]['redelegated_trust_id']: - trust_chain.append(t) - break + while trust and trust.get('redelegated_trust_id'): + trust = self.driver.get_trust(trust['redelegated_trust_id']) + trust_chain.append(trust) return trust_chain