Merge "Remove unused statements in matches"
This commit is contained in:
commit
f353b0620a
@ -169,13 +169,6 @@ def matches(event, token_values):
|
||||
# that the token is still valid and short-circuits the
|
||||
# rest of the logic.
|
||||
|
||||
# The token has three attributes that can match the user_id.
|
||||
if event.user_id is not None and event.user_id not in (
|
||||
token_values['user_id'],
|
||||
token_values['trustor_id'],
|
||||
token_values['trustee_id'],):
|
||||
return False
|
||||
|
||||
# The token has two attributes that can match the domain_id.
|
||||
if event.domain_id is not None and event.domain_id not in(
|
||||
token_values['identity_domain_id'],
|
||||
@ -188,10 +181,6 @@ def matches(event, token_values):
|
||||
|
||||
# If an event specifies an attribute name, but it does not match, the token
|
||||
# is not revoked.
|
||||
if event.project_id is not None and event.project_id not in (
|
||||
token_values['project_id'],):
|
||||
return False
|
||||
|
||||
if event.expires_at is not None and event.expires_at not in (
|
||||
token_values['expires_at'],):
|
||||
return False
|
||||
@ -208,10 +197,6 @@ def matches(event, token_values):
|
||||
token_values['access_token_id'],):
|
||||
return False
|
||||
|
||||
if event.audit_id is not None and event.audit_id not in (
|
||||
token_values['audit_id'],):
|
||||
return False
|
||||
|
||||
if event.audit_chain_id is not None and event.audit_chain_id not in (
|
||||
token_values['audit_chain_id'],):
|
||||
return False
|
||||
@ -220,9 +205,6 @@ def matches(event, token_values):
|
||||
token_values['roles']):
|
||||
return False
|
||||
|
||||
if token_values['issued_at'] > event.issued_before:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
|
@ -51,6 +51,43 @@ def _sample_blank_token():
|
||||
return token_data
|
||||
|
||||
|
||||
def _sample_data():
|
||||
user_ids = []
|
||||
project_ids = []
|
||||
role_ids = []
|
||||
for i in range(0, 3):
|
||||
user_ids.append(uuid.uuid4().hex)
|
||||
project_ids.append(uuid.uuid4().hex)
|
||||
role_ids.append(uuid.uuid4().hex)
|
||||
|
||||
# For testing purposes, create 3 project tokens with a different user_id,
|
||||
# role_id, and project_id which will be used to verify that revoking by
|
||||
# grant on certain user_id, project_id, and role_id pairs leaves these
|
||||
# project_tokens unrevoked if only one of the revoked columns are matched
|
||||
# but not all of them as the expected behavior dictates
|
||||
|
||||
project_tokens = []
|
||||
i = len(project_tokens)
|
||||
project_tokens.append(_sample_blank_token())
|
||||
project_tokens[i]['user_id'] = user_ids[1]
|
||||
project_tokens[i]['project_id'] = project_ids[0]
|
||||
project_tokens[i]['roles'] = [role_ids[0]]
|
||||
|
||||
i = len(project_tokens)
|
||||
project_tokens.append(_sample_blank_token())
|
||||
project_tokens[i]['user_id'] = user_ids[0]
|
||||
project_tokens[i]['project_id'] = project_ids[1]
|
||||
project_tokens[i]['roles'] = [role_ids[0]]
|
||||
|
||||
i = len(project_tokens)
|
||||
project_tokens.append(_sample_blank_token())
|
||||
project_tokens[i]['user_id'] = user_ids[0]
|
||||
project_tokens[i]['project_id'] = project_ids[0]
|
||||
project_tokens[i]['roles'] = [role_ids[1]]
|
||||
|
||||
return user_ids, project_ids, role_ids, project_tokens
|
||||
|
||||
|
||||
def _matches(event, token_values):
|
||||
"""See if the token matches the revocation event.
|
||||
|
||||
@ -116,14 +153,22 @@ def _matches(event, token_values):
|
||||
class RevokeTests(object):
|
||||
|
||||
def _assertTokenRevoked(self, events, token_data):
|
||||
backend = sql.Revoke()
|
||||
if events:
|
||||
self.assertTrue(revoke_model.is_revoked(events, token_data),
|
||||
'Token should be revoked')
|
||||
return self.assertTrue(
|
||||
revoke_model.is_revoked(events, token_data),
|
||||
'Token should be revoked')
|
||||
revoke_model.is_revoked(backend.list_events(token=token_data),
|
||||
token_data), 'Token should be revoked')
|
||||
|
||||
def _assertTokenNotRevoked(self, events, token_data):
|
||||
backend = sql.Revoke()
|
||||
if events:
|
||||
self.assertTrue(revoke_model.is_revoked(events, token_data),
|
||||
'Token should be revoked')
|
||||
return self.assertFalse(
|
||||
revoke_model.is_revoked(events, token_data),
|
||||
'Token should not be revoked')
|
||||
revoke_model.is_revoked(backend.list_events(token=token_data),
|
||||
token_data), 'Token should not be revoked')
|
||||
|
||||
def test_list(self):
|
||||
self.revoke_api.revoke_by_user(user_id=1)
|
||||
@ -339,6 +384,108 @@ class RevokeTests(object):
|
||||
self.assertEqual(
|
||||
1, len(revocation_backend.list_events(token=fourth_token)))
|
||||
|
||||
def _user_field_test(self, field_name):
|
||||
token = _sample_blank_token()
|
||||
token[field_name] = uuid.uuid4().hex
|
||||
self.revoke_api.revoke_by_user(user_id=token[field_name])
|
||||
self._assertTokenRevoked(None, token)
|
||||
token2 = _sample_blank_token()
|
||||
token2[field_name] = uuid.uuid4().hex
|
||||
self._assertTokenNotRevoked(None, token2)
|
||||
|
||||
def test_revoke_by_user(self):
|
||||
self._user_field_test('user_id')
|
||||
|
||||
def test_revoke_by_user_matches_trustee(self):
|
||||
self._user_field_test('trustee_id')
|
||||
|
||||
def test_revoke_by_user_matches_trustor(self):
|
||||
self._user_field_test('trustor_id')
|
||||
|
||||
def test_revoke_by_audit_id(self):
|
||||
token = _sample_blank_token()
|
||||
# Audit ID and Audit Chain ID are populated with the same value
|
||||
# if the token is an original token
|
||||
token['audit_id'] = uuid.uuid4().hex
|
||||
token['audit_chain_id'] = token['audit_id']
|
||||
self.revoke_api.revoke_by_audit_id(audit_id=token['audit_id'])
|
||||
self._assertTokenRevoked(None, token)
|
||||
|
||||
token2 = _sample_blank_token()
|
||||
token2['audit_id'] = uuid.uuid4().hex
|
||||
token2['audit_chain_id'] = token2['audit_id']
|
||||
self._assertTokenNotRevoked(None, token2)
|
||||
|
||||
def test_by_project_grant(self):
|
||||
user_ids, project_ids, role_ids, project_tokens = _sample_data()
|
||||
token1 = _sample_blank_token()
|
||||
token1['roles'] = role_ids[0]
|
||||
token1['user_id'] = user_ids[0]
|
||||
token1['project_id'] = project_ids[0]
|
||||
|
||||
token2 = _sample_blank_token()
|
||||
token2['roles'] = role_ids[1]
|
||||
token2['user_id'] = user_ids[1]
|
||||
token2['project_id'] = project_ids[1]
|
||||
|
||||
token3 = _sample_blank_token()
|
||||
token3['roles'] = [role_ids[0],
|
||||
role_ids[1],
|
||||
role_ids[2]]
|
||||
token3['user_id'] = user_ids[2]
|
||||
token3['project_id'] = project_ids[2]
|
||||
|
||||
# Check that all tokens are revoked at the start
|
||||
self._assertTokenNotRevoked(None, token1)
|
||||
self._assertTokenNotRevoked(None, token2)
|
||||
self._assertTokenNotRevoked(None, token3)
|
||||
for token in project_tokens:
|
||||
self._assertTokenNotRevoked(None, token)
|
||||
|
||||
self.revoke_api.revoke_by_grant(role_id=role_ids[0],
|
||||
user_id=user_ids[0],
|
||||
project_id=project_ids[0])
|
||||
|
||||
# Only the first token should be revoked
|
||||
self._assertTokenRevoked(None, token1)
|
||||
self._assertTokenNotRevoked(None, token2)
|
||||
self._assertTokenNotRevoked(None, token3)
|
||||
for token in project_tokens:
|
||||
self._assertTokenNotRevoked(None, token)
|
||||
|
||||
self.revoke_api.revoke_by_grant(role_id=role_ids[1],
|
||||
user_id=user_ids[1],
|
||||
project_id=project_ids[1])
|
||||
|
||||
# Tokens 1 and 2 should be revoked now
|
||||
self._assertTokenRevoked(None, token1)
|
||||
self._assertTokenRevoked(None, token2)
|
||||
self._assertTokenNotRevoked(None, token3)
|
||||
for token in project_tokens:
|
||||
self._assertTokenNotRevoked(None, token)
|
||||
|
||||
# test that multiple roles with a single user and project get revoked
|
||||
# and invalidate token3
|
||||
self.revoke_api.revoke_by_grant(role_id=role_ids[0],
|
||||
user_id=user_ids[2],
|
||||
project_id=project_ids[2])
|
||||
|
||||
self.revoke_api.revoke_by_grant(role_id=role_ids[1],
|
||||
user_id=user_ids[2],
|
||||
project_id=project_ids[2])
|
||||
|
||||
self.revoke_api.revoke_by_grant(role_id=role_ids[2],
|
||||
user_id=user_ids[2],
|
||||
project_id=project_ids[2])
|
||||
|
||||
# Tokens 1, 2, and 3 should now be revoked leaving project_tokens
|
||||
# unrevoked.
|
||||
self._assertTokenRevoked(None, token1)
|
||||
self._assertTokenRevoked(None, token2)
|
||||
self._assertTokenRevoked(None, token3)
|
||||
for token in project_tokens:
|
||||
self._assertTokenNotRevoked(None, token)
|
||||
|
||||
@mock.patch.object(timeutils, 'utcnow')
|
||||
def test_expired_events_are_removed(self, mock_utcnow):
|
||||
def _sample_token_values():
|
||||
@ -413,46 +560,6 @@ class RevokeListTests(unit.TestCase):
|
||||
super(RevokeListTests, self).setUp()
|
||||
self.events = []
|
||||
self.revoke_events = list()
|
||||
self._sample_data()
|
||||
|
||||
def _sample_data(self):
|
||||
user_ids = []
|
||||
project_ids = []
|
||||
role_ids = []
|
||||
for i in range(0, 3):
|
||||
user_ids.append(uuid.uuid4().hex)
|
||||
project_ids.append(uuid.uuid4().hex)
|
||||
role_ids.append(uuid.uuid4().hex)
|
||||
|
||||
project_tokens = []
|
||||
i = len(project_tokens)
|
||||
project_tokens.append(_sample_blank_token())
|
||||
project_tokens[i]['user_id'] = user_ids[0]
|
||||
project_tokens[i]['project_id'] = project_ids[0]
|
||||
project_tokens[i]['roles'] = [role_ids[1]]
|
||||
|
||||
i = len(project_tokens)
|
||||
project_tokens.append(_sample_blank_token())
|
||||
project_tokens[i]['user_id'] = user_ids[1]
|
||||
project_tokens[i]['project_id'] = project_ids[0]
|
||||
project_tokens[i]['roles'] = [role_ids[0]]
|
||||
|
||||
i = len(project_tokens)
|
||||
project_tokens.append(_sample_blank_token())
|
||||
project_tokens[i]['user_id'] = user_ids[0]
|
||||
project_tokens[i]['project_id'] = project_ids[1]
|
||||
project_tokens[i]['roles'] = [role_ids[0]]
|
||||
|
||||
token_to_revoke = _sample_blank_token()
|
||||
token_to_revoke['user_id'] = user_ids[0]
|
||||
token_to_revoke['project_id'] = project_ids[0]
|
||||
token_to_revoke['roles'] = [role_ids[0]]
|
||||
|
||||
self.project_tokens = project_tokens
|
||||
self.user_ids = user_ids
|
||||
self.project_ids = project_ids
|
||||
self.role_ids = role_ids
|
||||
self.token_to_revoke = token_to_revoke
|
||||
|
||||
def _assertTokenRevoked(self, token_data):
|
||||
self.assertTrue(any([_matches(e, token_data) for e in self.events]))
|
||||
@ -471,13 +578,6 @@ class RevokeListTests(unit.TestCase):
|
||||
self.revoke_events,
|
||||
revoke_model.RevokeEvent(user_id=user_id))
|
||||
|
||||
def _revoke_by_audit_id(self, audit_id):
|
||||
event = add_event(
|
||||
self.revoke_events,
|
||||
revoke_model.RevokeEvent(audit_id=audit_id))
|
||||
self.events.append(event)
|
||||
return event
|
||||
|
||||
def _revoke_by_audit_chain_id(self, audit_chain_id, project_id=None,
|
||||
domain_id=None):
|
||||
event = add_event(
|
||||
@ -500,17 +600,6 @@ class RevokeListTests(unit.TestCase):
|
||||
self.events.append(event)
|
||||
return event
|
||||
|
||||
def _revoke_by_grant(self, role_id, user_id=None,
|
||||
domain_id=None, project_id=None):
|
||||
event = add_event(
|
||||
self.revoke_events,
|
||||
revoke_model.RevokeEvent(user_id=user_id,
|
||||
role_id=role_id,
|
||||
domain_id=domain_id,
|
||||
project_id=project_id))
|
||||
self.events.append(event)
|
||||
return event
|
||||
|
||||
def _revoke_by_user_and_project(self, user_id, project_id):
|
||||
event = add_event(self.revoke_events,
|
||||
revoke_model.RevokeEvent(project_id=project_id,
|
||||
@ -537,48 +626,6 @@ class RevokeListTests(unit.TestCase):
|
||||
revoke_model.RevokeEvent(domain_id=domain_id))
|
||||
self.events.append(event)
|
||||
|
||||
def _user_field_test(self, field_name):
|
||||
user_id = uuid.uuid4().hex
|
||||
event = self._revoke_by_user(user_id)
|
||||
self.events.append(event)
|
||||
token_data_u1 = _sample_blank_token()
|
||||
token_data_u1[field_name] = user_id
|
||||
self._assertTokenRevoked(token_data_u1)
|
||||
token_data_u2 = _sample_blank_token()
|
||||
token_data_u2[field_name] = uuid.uuid4().hex
|
||||
self._assertTokenNotRevoked(token_data_u2)
|
||||
remove_event(self.revoke_events, event)
|
||||
self.events.remove(event)
|
||||
self._assertTokenNotRevoked(token_data_u1)
|
||||
|
||||
def test_revoke_by_user(self):
|
||||
self._user_field_test('user_id')
|
||||
|
||||
def test_revoke_by_user_matches_trustee(self):
|
||||
self._user_field_test('trustee_id')
|
||||
|
||||
def test_revoke_by_user_matches_trustor(self):
|
||||
self._user_field_test('trustor_id')
|
||||
|
||||
def test_revoke_by_audit_id(self):
|
||||
audit_id = common.build_audit_info(parent_audit_id=None)[0]
|
||||
token_data_1 = _sample_blank_token()
|
||||
# Audit ID and Audit Chain ID are populated with the same value
|
||||
# if the token is an original token
|
||||
token_data_1['audit_id'] = audit_id
|
||||
token_data_1['audit_chain_id'] = audit_id
|
||||
event = self._revoke_by_audit_id(audit_id)
|
||||
self._assertTokenRevoked(token_data_1)
|
||||
|
||||
audit_id_2 = common.build_audit_info(parent_audit_id=audit_id)[0]
|
||||
token_data_2 = _sample_blank_token()
|
||||
token_data_2['audit_id'] = audit_id_2
|
||||
token_data_2['audit_chain_id'] = audit_id
|
||||
self._assertTokenNotRevoked(token_data_2)
|
||||
|
||||
self.remove_event(event)
|
||||
self._assertTokenNotRevoked(token_data_1)
|
||||
|
||||
def test_revoke_by_audit_chain_id(self):
|
||||
audit_id = common.build_audit_info(parent_audit_id=None)[0]
|
||||
token_data_1 = _sample_blank_token()
|
||||
@ -603,57 +650,6 @@ class RevokeListTests(unit.TestCase):
|
||||
self.events.remove(event)
|
||||
remove_event(self.revoke_events, event)
|
||||
|
||||
def test_by_project_grant(self):
|
||||
token_to_revoke = self.token_to_revoke
|
||||
tokens = self.project_tokens
|
||||
|
||||
self._assertTokenNotRevoked(token_to_revoke)
|
||||
for token in tokens:
|
||||
self._assertTokenNotRevoked(token)
|
||||
|
||||
event = self._revoke_by_grant(role_id=self.role_ids[0],
|
||||
user_id=self.user_ids[0],
|
||||
project_id=self.project_ids[0])
|
||||
|
||||
self._assertTokenRevoked(token_to_revoke)
|
||||
for token in tokens:
|
||||
self._assertTokenNotRevoked(token)
|
||||
|
||||
self.remove_event(event)
|
||||
|
||||
self._assertTokenNotRevoked(token_to_revoke)
|
||||
for token in tokens:
|
||||
self._assertTokenNotRevoked(token)
|
||||
|
||||
token_to_revoke['roles'] = [self.role_ids[0],
|
||||
self.role_ids[1],
|
||||
self.role_ids[2]]
|
||||
|
||||
event = self._revoke_by_grant(role_id=self.role_ids[0],
|
||||
user_id=self.user_ids[0],
|
||||
project_id=self.project_ids[0])
|
||||
self._assertTokenRevoked(token_to_revoke)
|
||||
self.remove_event(event)
|
||||
self._assertTokenNotRevoked(token_to_revoke)
|
||||
|
||||
event = self._revoke_by_grant(role_id=self.role_ids[1],
|
||||
user_id=self.user_ids[0],
|
||||
project_id=self.project_ids[0])
|
||||
self._assertTokenRevoked(token_to_revoke)
|
||||
self.remove_event(event)
|
||||
self._assertTokenNotRevoked(token_to_revoke)
|
||||
|
||||
self._revoke_by_grant(role_id=self.role_ids[0],
|
||||
user_id=self.user_ids[0],
|
||||
project_id=self.project_ids[0])
|
||||
self._revoke_by_grant(role_id=self.role_ids[1],
|
||||
user_id=self.user_ids[0],
|
||||
project_id=self.project_ids[0])
|
||||
self._revoke_by_grant(role_id=self.role_ids[2],
|
||||
user_id=self.user_ids[0],
|
||||
project_id=self.project_ids[0])
|
||||
self._assertTokenRevoked(token_to_revoke)
|
||||
|
||||
def test_by_project_and_user_and_role(self):
|
||||
user_id1 = uuid.uuid4().hex
|
||||
user_id2 = uuid.uuid4().hex
|
||||
|
Loading…
Reference in New Issue
Block a user