Fix trust redelegation and associated test

Currently keystone does not allow for redelegation of trust.
The only way for creating a new trust with the currect implementation
using a trusted token was with impersonation. This patch fixes the
workflow for redelegating a trust.

Change-Id: If8f537b7525bfa26604dbce891dc193169aa21e2
Partial-Bug: 1534834
This commit is contained in:
Jorge Munoz 2016-01-19 19:33:01 +00:00 committed by Jorge Munoz
parent aa640e1e35
commit 5196237c82
5 changed files with 109 additions and 48 deletions

View File

@ -424,7 +424,8 @@ def new_policy_ref(**kwargs):
def new_trust_ref(trustor_user_id, trustee_user_id, project_id=None,
impersonation=None, expires=None, role_ids=None,
role_names=None, remaining_uses=None,
allow_redelegation=False, **kwargs):
allow_redelegation=False, redelegation_count=None,
redelegated_trust_id=None, **kwargs):
ref = {
'id': uuid.uuid4().hex,
'trustor_user_id': trustor_user_id,
@ -433,8 +434,12 @@ def new_trust_ref(trustor_user_id, trustee_user_id, project_id=None,
'project_id': project_id,
'remaining_uses': remaining_uses,
'allow_redelegation': allow_redelegation,
'redelegated_trust_id': redelegated_trust_id,
}
if isinstance(redelegation_count, int):
ref.update(redelegation_count=redelegation_count)
if isinstance(expires, six.string_types):
ref['expires_at'] = expires
elif isinstance(expires, dict):

View File

@ -3165,27 +3165,43 @@ class TestTrustChain(test_v3.RestfulTestCase):
def setUp(self):
super(TestTrustChain, self).setUp()
# Create trust chain
self.user_chain = list()
"""Create a trust chain using redelegation.
A trust chain is a series of trusts that are redelegated. For example,
self.user_list consists of userA, userB, and userC. The first trust in
the trust chain is going to be established between self.user and userA,
call it trustA. Then, userA is going to obtain a trust scoped token
using trustA, and with that token create a trust between userA and
userB called trustB. This pattern will continue with userB creating a
trust with userC.
So the trust chain should look something like:
trustA -> trustB -> trustC
Where:
self.user is trusting userA with trustA
userA is trusting userB with trustB
userB is trusting userC with trustC
"""
self.user_list = list()
self.trust_chain = list()
for _ in range(3):
user = unit.create_user(self.identity_api,
domain_id=self.domain_id)
self.user_chain.append(user)
self.user_list.append(user)
# trustor->trustee
trustee = self.user_chain[0]
# trustor->trustee redelegation without impersonation
trustee = self.user_list[0]
trust_ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=trustee['id'],
project_id=self.project_id,
impersonation=True,
impersonation=False,
expires=dict(minutes=1),
role_ids=[self.role_id])
trust_ref.update(
role_ids=[self.role_id],
allow_redelegation=True,
redelegation_count=3)
# Create a trust between self.user and the first user in the list
r = self.post('/OS-TRUST/trusts',
body={'trust': trust_ref})
@ -3194,30 +3210,39 @@ class TestTrustChain(test_v3.RestfulTestCase):
user_id=trustee['id'],
password=trustee['password'],
trust_id=trust['id'])
# Generate a trusted token for the first user
trust_token = self.get_requested_token(auth_data)
self.trust_chain.append(trust)
for trustee in self.user_chain[1:]:
# Set trustor and redelegated_trust_id for next trust in the chain
next_trustor_id = trustee['id']
redelegated_trust_id = trust['id']
# Loop through the user to create a chain of redelegated trust.
for next_trustee in self.user_list[1:]:
trust_ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=trustee['id'],
trustor_user_id=next_trustor_id,
trustee_user_id=next_trustee['id'],
project_id=self.project_id,
impersonation=True,
role_ids=[self.role_id])
trust_ref.update(
allow_redelegation=True)
impersonation=False,
role_ids=[self.role_id],
allow_redelegation=True,
redelegated_trust_id=redelegated_trust_id)
r = self.post('/OS-TRUST/trusts',
body={'trust': trust_ref},
token=trust_token)
trust = self.assertValidTrustResponse(r)
auth_data = self.build_authentication_request(
user_id=trustee['id'],
password=trustee['password'],
user_id=next_trustee['id'],
password=next_trustee['password'],
trust_id=trust['id'])
trust_token = self.get_requested_token(auth_data)
self.trust_chain.append(trust)
next_trustor_id = next_trustee['id']
redelegated_trust_id = trust['id']
trustee = self.user_chain[-1]
trustee = self.user_list[-1]
trust = self.trust_chain[-1]
auth_data = self.build_authentication_request(
user_id=trustee['id'],
@ -3235,7 +3260,7 @@ class TestTrustChain(test_v3.RestfulTestCase):
self.assertValidTokenResponse(r)
def assert_trust_tokens_revoked(self, trust_id):
trustee = self.user_chain[0]
trustee = self.user_list[0]
auth_data = self.build_authentication_request(
user_id=trustee['id'],
password=trustee['password']
@ -3253,7 +3278,7 @@ class TestTrustChain(test_v3.RestfulTestCase):
trust_id)
def test_delete_trust_cascade(self):
self.assert_user_authenticate(self.user_chain[0])
self.assert_user_authenticate(self.user_list[0])
self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
'trust_id': self.trust_chain[0]['id']})
@ -3263,30 +3288,51 @@ class TestTrustChain(test_v3.RestfulTestCase):
self.assert_trust_tokens_revoked(self.trust_chain[0]['id'])
def test_delete_broken_chain(self):
self.assert_user_authenticate(self.user_chain[0])
self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
'trust_id': self.trust_chain[1]['id']})
self.assert_user_authenticate(self.user_list[0])
self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
'trust_id': self.trust_chain[0]['id']})
# Verify the two remaining trust have been deleted
for i in xrange(len(self.user_list) - 1):
auth_data = self.build_authentication_request(
user_id=self.user_list[i]['id'],
password=self.user_list[i]['password'])
auth_token = self.get_requested_token(auth_data)
self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
'trust_id': self.trust_chain[i + 1]['id']},
token=auth_token,
expected_status=http_client.NOT_FOUND)
def test_trustor_roles_revoked(self):
self.assert_user_authenticate(self.user_chain[0])
self.assert_user_authenticate(self.user_list[0])
self.assignment_api.remove_role_from_user_and_project(
self.user_id, self.project_id, self.role_id
)
auth_data = self.build_authentication_request(
token=self.last_token,
trust_id=self.trust_chain[-1]['id'])
self.v3_create_token(auth_data,
expected_status=http_client.NOT_FOUND)
# Verify that users are not allowed to authenticate with trust
for i in xrange(len(self.user_list[1:])):
trustee = self.user_list[i]
auth_data = self.build_authentication_request(
user_id=trustee['id'],
password=trustee['password'])
# Attempt to authenticate with trust
token = self.get_requested_token(auth_data)
auth_data = self.build_authentication_request(
token=token,
trust_id=self.trust_chain[i - 1]['id'])
# Trustee has no delegated roles
self.v3_create_token(auth_data,
expected_status=http_client.FORBIDDEN)
def test_intermediate_user_disabled(self):
self.assert_user_authenticate(self.user_chain[0])
self.assert_user_authenticate(self.user_list[0])
disabled = self.user_chain[0]
disabled = self.user_list[0]
disabled['enabled'] = False
self.identity_api.update_user(disabled['id'], disabled)
@ -3297,9 +3343,9 @@ class TestTrustChain(test_v3.RestfulTestCase):
expected_status=http_client.FORBIDDEN)
def test_intermediate_user_deleted(self):
self.assert_user_authenticate(self.user_chain[0])
self.assert_user_authenticate(self.user_list[0])
self.identity_api.delete_user(self.user_chain[0]['id'])
self.identity_api.delete_user(self.user_list[0]['id'])
# Bypass policy enforcement
with mock.patch.object(rules, 'enforce', return_value=True):

View File

@ -357,7 +357,16 @@ class V3TokenDataHelper(object):
return
if CONF.trust.enabled and trust:
token_user_id = trust['trustor_user_id']
# If redelegated_trust_id is set, then we must traverse the
# trust_chain in order to determine who the original trustor is. We
# need to do this because the user ID of the original trustor helps
# us determine scope in the redelegated context.
if trust.get('redelegated_trust_id'):
trust_chain = self.trust_api.get_trust_pedigree(trust['id'])
token_user_id = trust_chain[-1]['trustor_user_id']
else:
token_user_id = trust['trustor_user_id']
token_project_id = trust['project_id']
# trusts do not support domains yet
token_domain_id = None

View File

@ -172,17 +172,23 @@ class TrustV3(controller.V3Controller):
raise exception.Forbidden(
_('At least one role should be specified.'))
def _get_user_role(self, trust):
def _get_trustor_roles(self, trust):
original_trust = trust.copy()
while original_trust.get('redelegated_trust_id'):
original_trust = self.trust_api.get_trust(
original_trust['redelegated_trust_id'])
if not self._attribute_is_empty(trust, 'project_id'):
return self.assignment_api.get_roles_for_user_and_project(
trust['trustor_user_id'], trust['project_id'])
original_trust['trustor_user_id'],
original_trust['project_id'])
else:
return []
def _require_trustor_has_role_in_project(self, trust):
user_roles = self._get_user_role(trust)
trustor_roles = self._get_trustor_roles(trust)
for trust_role in trust['roles']:
matching_roles = [x for x in user_roles
matching_roles = [x for x in trustor_roles
if x == trust_role['id']]
if not matching_roles:
raise exception.RoleNotFound(role_id=trust_role['id'])

View File

@ -90,14 +90,9 @@ class Manager(manager.Manager):
def get_trust_pedigree(self, trust_id):
trust = self.driver.get_trust(trust_id)
trust_chain = [trust]
if trust and trust.get('redelegated_trust_id'):
trusts = self.driver.list_trusts_for_trustor(
trust['trustor_user_id'])
while trust_chain[-1].get('redelegated_trust_id'):
for t in trusts:
if t['id'] == trust_chain[-1]['redelegated_trust_id']:
trust_chain.append(t)
break
while trust and trust.get('redelegated_trust_id'):
trust = self.driver.get_trust(trust['redelegated_trust_id'])
trust_chain.append(trust)
return trust_chain