Remove unused code in test_revoke
Now that all the tests as been consolidated and refactored into the class that tests the actual backend as opposed to _matches, we can remove all the bloat code that isn't being put to use anymore and banish it to from keystone.. FOREVER!! The things that were removed are the following: A private method _past_time which isn't being used anymore. A private method _matches which is a spoofed version of the actual in code check_token in revoke/core.py. Methods add_event and remove_event which are no longer needed now that we check the backend revocation events directly rather than store them in a list. Entire class RevokeListTests which used to test revocation with a manually made list of revocation events and _matches but has had all of its tests refactored and moved to the class that tests the backend directly with the actual check_token in the revoke_api. Change-Id: I1cdad1ac656d8074d99d6b46c450d397f32ca428
This commit is contained in:
parent
b4990dcc31
commit
2dbd5d99bb
@ -16,7 +16,6 @@ import uuid
|
|||||||
|
|
||||||
import mock
|
import mock
|
||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
from six.moves import range
|
|
||||||
from testtools import matchers
|
from testtools import matchers
|
||||||
|
|
||||||
from keystone.common import utils
|
from keystone.common import utils
|
||||||
@ -39,12 +38,6 @@ def _future_time():
|
|||||||
return future_time
|
return future_time
|
||||||
|
|
||||||
|
|
||||||
def _past_time():
|
|
||||||
expire_delta = datetime.timedelta(days=-1000)
|
|
||||||
past_time = timeutils.utcnow() + expire_delta
|
|
||||||
return past_time
|
|
||||||
|
|
||||||
|
|
||||||
def _sample_blank_token():
|
def _sample_blank_token():
|
||||||
issued_delta = datetime.timedelta(minutes=-2)
|
issued_delta = datetime.timedelta(minutes=-2)
|
||||||
issued_at = timeutils.utcnow() + issued_delta
|
issued_at = timeutils.utcnow() + issued_delta
|
||||||
@ -52,68 +45,6 @@ def _sample_blank_token():
|
|||||||
return token_data
|
return token_data
|
||||||
|
|
||||||
|
|
||||||
def _matches(event, token_values):
|
|
||||||
"""See if the token matches the revocation event.
|
|
||||||
|
|
||||||
Used as a secondary check on the logic to Check
|
|
||||||
By Tree Below: This is abrute force approach to checking.
|
|
||||||
Compare each attribute from the event with the corresponding
|
|
||||||
value from the token. If the event does not have a value for
|
|
||||||
the attribute, a match is still possible. If the event has a
|
|
||||||
value for the attribute, and it does not match the token, no match
|
|
||||||
is possible, so skip the remaining checks.
|
|
||||||
|
|
||||||
:param event: one revocation event to match
|
|
||||||
:param token_values: dictionary with set of values taken from the
|
|
||||||
token
|
|
||||||
:returns: True if the token matches the revocation event, indicating the
|
|
||||||
token has been revoked
|
|
||||||
"""
|
|
||||||
# The token has three attributes that can match the user_id
|
|
||||||
if event.user_id is not None:
|
|
||||||
for attribute_name in ['user_id', 'trustor_id', 'trustee_id']:
|
|
||||||
if event.user_id == token_values[attribute_name]:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# The token has two attributes that can match the domain_id
|
|
||||||
if event.domain_id is not None:
|
|
||||||
for attribute_name in ['identity_domain_id', 'assignment_domain_id']:
|
|
||||||
if event.domain_id == token_values[attribute_name]:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
if event.domain_scope_id is not None:
|
|
||||||
if event.domain_scope_id != token_values['assignment_domain_id']:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# If any one check does not match, the while token does
|
|
||||||
# not match the event. The numerous return False indicate
|
|
||||||
# that the token is still valid and short-circuits the
|
|
||||||
# rest of the logic.
|
|
||||||
attribute_names = ['project_id',
|
|
||||||
'expires_at', 'trust_id', 'consumer_id',
|
|
||||||
'access_token_id', 'audit_id', 'audit_chain_id']
|
|
||||||
for attribute_name in attribute_names:
|
|
||||||
if getattr(event, attribute_name) is not None:
|
|
||||||
if (getattr(event, attribute_name) !=
|
|
||||||
token_values[attribute_name]):
|
|
||||||
return False
|
|
||||||
|
|
||||||
if event.role_id is not None:
|
|
||||||
roles = token_values['roles']
|
|
||||||
for role in roles:
|
|
||||||
if event.role_id == role:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
if token_values['issued_at'] > event.issued_before:
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
class RevokeTests(object):
|
class RevokeTests(object):
|
||||||
|
|
||||||
def _assertTokenRevoked(self, token_data):
|
def _assertTokenRevoked(self, token_data):
|
||||||
@ -562,86 +493,3 @@ class FernetSqlRevokeTests(test_backend_sql.SqlTests, RevokeTests):
|
|||||||
CONF.fernet_tokens.max_active_keys
|
CONF.fernet_tokens.max_active_keys
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def add_event(events, event):
|
|
||||||
events.append(event)
|
|
||||||
return event
|
|
||||||
|
|
||||||
|
|
||||||
def remove_event(events, event):
|
|
||||||
for target in events:
|
|
||||||
if target == event:
|
|
||||||
events.remove(target)
|
|
||||||
|
|
||||||
|
|
||||||
class RevokeListTests(unit.TestCase):
|
|
||||||
def setUp(self):
|
|
||||||
super(RevokeListTests, self).setUp()
|
|
||||||
self.events = []
|
|
||||||
self.revoke_events = list()
|
|
||||||
|
|
||||||
def _assertTokenRevoked(self, token_data):
|
|
||||||
self.assertTrue(any([_matches(e, token_data) for e in self.events]))
|
|
||||||
return self.assertTrue(
|
|
||||||
revoke_model.is_revoked(self.revoke_events, token_data),
|
|
||||||
'Token should be revoked')
|
|
||||||
|
|
||||||
def _assertTokenNotRevoked(self, token_data):
|
|
||||||
self.assertFalse(any([_matches(e, token_data) for e in self.events]))
|
|
||||||
return self.assertFalse(
|
|
||||||
revoke_model.is_revoked(self.revoke_events, token_data),
|
|
||||||
'Token should not be revoked')
|
|
||||||
|
|
||||||
def _revoke_by_user(self, user_id):
|
|
||||||
return add_event(
|
|
||||||
self.revoke_events,
|
|
||||||
revoke_model.RevokeEvent(user_id=user_id))
|
|
||||||
|
|
||||||
def _revoke_by_audit_chain_id(self, audit_chain_id, project_id=None,
|
|
||||||
domain_id=None):
|
|
||||||
event = add_event(
|
|
||||||
self.revoke_events,
|
|
||||||
revoke_model.RevokeEvent(audit_chain_id=audit_chain_id,
|
|
||||||
project_id=project_id,
|
|
||||||
domain_id=domain_id)
|
|
||||||
)
|
|
||||||
self.events.append(event)
|
|
||||||
return event
|
|
||||||
|
|
||||||
def _revoke_by_expiration(self, user_id, expires_at, project_id=None,
|
|
||||||
domain_id=None):
|
|
||||||
event = add_event(
|
|
||||||
self.revoke_events,
|
|
||||||
revoke_model.RevokeEvent(user_id=user_id,
|
|
||||||
expires_at=expires_at,
|
|
||||||
project_id=project_id,
|
|
||||||
domain_id=domain_id))
|
|
||||||
self.events.append(event)
|
|
||||||
return event
|
|
||||||
|
|
||||||
def _revoke_by_user_and_project(self, user_id, project_id):
|
|
||||||
event = add_event(self.revoke_events,
|
|
||||||
revoke_model.RevokeEvent(project_id=project_id,
|
|
||||||
user_id=user_id))
|
|
||||||
self.events.append(event)
|
|
||||||
return event
|
|
||||||
|
|
||||||
def remove_event(self, event):
|
|
||||||
self.events.remove(event)
|
|
||||||
remove_event(self.revoke_events, event)
|
|
||||||
|
|
||||||
def _assertEmpty(self, collection):
|
|
||||||
return self.assertEqual(0, len(collection), "collection not empty")
|
|
||||||
|
|
||||||
def test_cleanup(self):
|
|
||||||
events = self.events
|
|
||||||
self._assertEmpty(self.revoke_events)
|
|
||||||
for i in range(0, 10):
|
|
||||||
events.append(
|
|
||||||
self._revoke_by_user_and_project(uuid.uuid4().hex,
|
|
||||||
uuid.uuid4().hex))
|
|
||||||
|
|
||||||
for event in self.events:
|
|
||||||
remove_event(self.revoke_events, event)
|
|
||||||
self._assertEmpty(self.revoke_events)
|
|
||||||
|
Loading…
Reference in New Issue
Block a user