diff --git a/keystone/tests/unit/test_revoke.py b/keystone/tests/unit/test_revoke.py index e034bc49e6..da4e009779 100644 --- a/keystone/tests/unit/test_revoke.py +++ b/keystone/tests/unit/test_revoke.py @@ -16,7 +16,6 @@ import uuid import mock from oslo_utils import timeutils -from six.moves import range from testtools import matchers from keystone.common import utils @@ -39,12 +38,6 @@ def _future_time(): return future_time -def _past_time(): - expire_delta = datetime.timedelta(days=-1000) - past_time = timeutils.utcnow() + expire_delta - return past_time - - def _sample_blank_token(): issued_delta = datetime.timedelta(minutes=-2) issued_at = timeutils.utcnow() + issued_delta @@ -52,68 +45,6 @@ def _sample_blank_token(): return token_data -def _matches(event, token_values): - """See if the token matches the revocation event. - - Used as a secondary check on the logic to Check - By Tree Below: This is abrute force approach to checking. - Compare each attribute from the event with the corresponding - value from the token. If the event does not have a value for - the attribute, a match is still possible. If the event has a - value for the attribute, and it does not match the token, no match - is possible, so skip the remaining checks. - - :param event: one revocation event to match - :param token_values: dictionary with set of values taken from the - token - :returns: True if the token matches the revocation event, indicating the - token has been revoked - """ - # The token has three attributes that can match the user_id - if event.user_id is not None: - for attribute_name in ['user_id', 'trustor_id', 'trustee_id']: - if event.user_id == token_values[attribute_name]: - break - else: - return False - - # The token has two attributes that can match the domain_id - if event.domain_id is not None: - for attribute_name in ['identity_domain_id', 'assignment_domain_id']: - if event.domain_id == token_values[attribute_name]: - break - else: - return False - - if event.domain_scope_id is not None: - if event.domain_scope_id != token_values['assignment_domain_id']: - return False - - # If any one check does not match, the while token does - # not match the event. The numerous return False indicate - # that the token is still valid and short-circuits the - # rest of the logic. - attribute_names = ['project_id', - 'expires_at', 'trust_id', 'consumer_id', - 'access_token_id', 'audit_id', 'audit_chain_id'] - for attribute_name in attribute_names: - if getattr(event, attribute_name) is not None: - if (getattr(event, attribute_name) != - token_values[attribute_name]): - return False - - if event.role_id is not None: - roles = token_values['roles'] - for role in roles: - if event.role_id == role: - break - else: - return False - if token_values['issued_at'] > event.issued_before: - return False - return True - - class RevokeTests(object): def _assertTokenRevoked(self, token_data): @@ -562,86 +493,3 @@ class FernetSqlRevokeTests(test_backend_sql.SqlTests, RevokeTests): CONF.fernet_tokens.max_active_keys ) ) - - -def add_event(events, event): - events.append(event) - return event - - -def remove_event(events, event): - for target in events: - if target == event: - events.remove(target) - - -class RevokeListTests(unit.TestCase): - def setUp(self): - super(RevokeListTests, self).setUp() - self.events = [] - self.revoke_events = list() - - def _assertTokenRevoked(self, token_data): - self.assertTrue(any([_matches(e, token_data) for e in self.events])) - return self.assertTrue( - revoke_model.is_revoked(self.revoke_events, token_data), - 'Token should be revoked') - - def _assertTokenNotRevoked(self, token_data): - self.assertFalse(any([_matches(e, token_data) for e in self.events])) - return self.assertFalse( - revoke_model.is_revoked(self.revoke_events, token_data), - 'Token should not be revoked') - - def _revoke_by_user(self, user_id): - return add_event( - self.revoke_events, - revoke_model.RevokeEvent(user_id=user_id)) - - def _revoke_by_audit_chain_id(self, audit_chain_id, project_id=None, - domain_id=None): - event = add_event( - self.revoke_events, - revoke_model.RevokeEvent(audit_chain_id=audit_chain_id, - project_id=project_id, - domain_id=domain_id) - ) - self.events.append(event) - return event - - def _revoke_by_expiration(self, user_id, expires_at, project_id=None, - domain_id=None): - event = add_event( - self.revoke_events, - revoke_model.RevokeEvent(user_id=user_id, - expires_at=expires_at, - project_id=project_id, - domain_id=domain_id)) - self.events.append(event) - return event - - def _revoke_by_user_and_project(self, user_id, project_id): - event = add_event(self.revoke_events, - revoke_model.RevokeEvent(project_id=project_id, - user_id=user_id)) - self.events.append(event) - return event - - def remove_event(self, event): - self.events.remove(event) - remove_event(self.revoke_events, event) - - def _assertEmpty(self, collection): - return self.assertEqual(0, len(collection), "collection not empty") - - def test_cleanup(self): - events = self.events - self._assertEmpty(self.revoke_events) - for i in range(0, 10): - events.append( - self._revoke_by_user_and_project(uuid.uuid4().hex, - uuid.uuid4().hex)) - - for event in self.events: - remove_event(self.revoke_events, event) - self._assertEmpty(self.revoke_events)