Merge "Invalidate trust when the trustor or trustee is deleted"
This commit is contained in:
commit
f98e8fd0db
@ -4880,10 +4880,11 @@ class TestTrustChain(test_v3.RestfulTestCase):
|
||||
self.identity_api.delete_user(self.user_list[0]['id'])
|
||||
|
||||
# Bypass policy enforcement
|
||||
# Delete trustee will invalidate the trust.
|
||||
with mock.patch.object(rules, 'enforce', return_value=True):
|
||||
headers = {'X-Subject-Token': self.last_token}
|
||||
self.head('/auth/tokens', headers=headers,
|
||||
expected_status=http_client.FORBIDDEN)
|
||||
expected_status=http_client.NOT_FOUND)
|
||||
|
||||
|
||||
class TestAuthContext(unit.TestCase):
|
||||
|
@ -16,6 +16,7 @@ import uuid
|
||||
from six.moves import http_client
|
||||
|
||||
import keystone.conf
|
||||
from keystone import exception
|
||||
from keystone.tests import unit
|
||||
from keystone.tests.unit import test_v3
|
||||
|
||||
@ -460,3 +461,50 @@ class TestTrustOperations(test_v3.RestfulTestCase):
|
||||
token=resp.headers.get('X-Subject-Token'),
|
||||
method='POST',
|
||||
expected_status=http_client.FORBIDDEN)
|
||||
|
||||
def test_trust_deleted_when_user_deleted(self):
|
||||
# create trust
|
||||
ref = unit.new_trust_ref(
|
||||
trustor_user_id=self.user_id,
|
||||
trustee_user_id=self.trustee_user_id,
|
||||
project_id=self.project_id,
|
||||
impersonation=False,
|
||||
role_ids=[self.role_id],
|
||||
allow_redelegation=True)
|
||||
resp = self.post('/OS-TRUST/trusts', body={'trust': ref})
|
||||
|
||||
trust = self.assertValidTrustResponse(resp)
|
||||
|
||||
# list all trusts
|
||||
r = self.get('/OS-TRUST/trusts')
|
||||
self.assertEqual(1, len(r.result['trusts']))
|
||||
|
||||
# delete the trustee will delete the trust
|
||||
self.delete(
|
||||
'/users/%(user_id)s' % {'user_id': trust['trustee_user_id']})
|
||||
|
||||
self.get(
|
||||
'/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
|
||||
expected_status=http_client.NOT_FOUND)
|
||||
|
||||
# create another user as the new trustee
|
||||
trustee_user = unit.create_user(self.identity_api,
|
||||
domain_id=self.domain_id)
|
||||
trustee_user_id = trustee_user['id']
|
||||
# create the trust again
|
||||
ref['trustee_user_id'] = trustee_user_id
|
||||
resp = self.post('/OS-TRUST/trusts', body={'trust': ref})
|
||||
|
||||
trust = self.assertValidTrustResponse(resp)
|
||||
r = self.get('/OS-TRUST/trusts')
|
||||
self.assertEqual(1, len(r.result['trusts']))
|
||||
|
||||
# delete the trustor will delete the trust
|
||||
self.delete(
|
||||
'/users/%(user_id)s' % {'user_id': trust['trustor_user_id']})
|
||||
|
||||
# call the backend method directly to bypass authentication since the
|
||||
# user has been deleted.
|
||||
self.assertRaises(exception.TrustNotFound,
|
||||
self.trust_api.get_trust,
|
||||
trust['id'])
|
||||
|
@ -43,6 +43,20 @@ class Manager(manager.Manager):
|
||||
|
||||
def __init__(self):
|
||||
super(Manager, self).__init__(CONF.trust.driver)
|
||||
notifications.register_event_callback(
|
||||
notifications.ACTIONS.deleted, 'user',
|
||||
self._on_user_delete)
|
||||
|
||||
def _on_user_delete(self, service, resource_type, operation,
|
||||
payload):
|
||||
# NOTE(davechen): Only delete the user that is maintained by
|
||||
# keystone will delete the related trust, since we don't know
|
||||
# when a LDAP user or federation user is deleted.
|
||||
user_id = payload['resource_info']
|
||||
trusts = self.driver.list_trusts_for_trustee(user_id)
|
||||
trusts = trusts + self.driver.list_trusts_for_trustor(user_id)
|
||||
for trust in trusts:
|
||||
self.driver.delete_trust(trust['id'])
|
||||
|
||||
@staticmethod
|
||||
def _validate_redelegation(redelegated_trust, trust):
|
||||
|
Loading…
x
Reference in New Issue
Block a user