diff --git a/keystone/models/revoke_model.py b/keystone/models/revoke_model.py index 4efe0b36d9..78917cb5dd 100644 --- a/keystone/models/revoke_model.py +++ b/keystone/models/revoke_model.py @@ -169,13 +169,6 @@ def matches(event, token_values): # that the token is still valid and short-circuits the # rest of the logic. - # The token has three attributes that can match the user_id. - if event.user_id is not None and event.user_id not in ( - token_values['user_id'], - token_values['trustor_id'], - token_values['trustee_id'],): - return False - # The token has two attributes that can match the domain_id. if event.domain_id is not None and event.domain_id not in( token_values['identity_domain_id'], @@ -188,10 +181,6 @@ def matches(event, token_values): # If an event specifies an attribute name, but it does not match, the token # is not revoked. - if event.project_id is not None and event.project_id not in ( - token_values['project_id'],): - return False - if event.expires_at is not None and event.expires_at not in ( token_values['expires_at'],): return False @@ -208,10 +197,6 @@ def matches(event, token_values): token_values['access_token_id'],): return False - if event.audit_id is not None and event.audit_id not in ( - token_values['audit_id'],): - return False - if event.audit_chain_id is not None and event.audit_chain_id not in ( token_values['audit_chain_id'],): return False @@ -220,9 +205,6 @@ def matches(event, token_values): token_values['roles']): return False - if token_values['issued_at'] > event.issued_before: - return False - return True diff --git a/keystone/tests/unit/test_revoke.py b/keystone/tests/unit/test_revoke.py index 328948171e..26a256a69d 100644 --- a/keystone/tests/unit/test_revoke.py +++ b/keystone/tests/unit/test_revoke.py @@ -51,6 +51,43 @@ def _sample_blank_token(): return token_data +def _sample_data(): + user_ids = [] + project_ids = [] + role_ids = [] + for i in range(0, 3): + user_ids.append(uuid.uuid4().hex) + project_ids.append(uuid.uuid4().hex) + role_ids.append(uuid.uuid4().hex) + + # For testing purposes, create 3 project tokens with a different user_id, + # role_id, and project_id which will be used to verify that revoking by + # grant on certain user_id, project_id, and role_id pairs leaves these + # project_tokens unrevoked if only one of the revoked columns are matched + # but not all of them as the expected behavior dictates + + project_tokens = [] + i = len(project_tokens) + project_tokens.append(_sample_blank_token()) + project_tokens[i]['user_id'] = user_ids[1] + project_tokens[i]['project_id'] = project_ids[0] + project_tokens[i]['roles'] = [role_ids[0]] + + i = len(project_tokens) + project_tokens.append(_sample_blank_token()) + project_tokens[i]['user_id'] = user_ids[0] + project_tokens[i]['project_id'] = project_ids[1] + project_tokens[i]['roles'] = [role_ids[0]] + + i = len(project_tokens) + project_tokens.append(_sample_blank_token()) + project_tokens[i]['user_id'] = user_ids[0] + project_tokens[i]['project_id'] = project_ids[0] + project_tokens[i]['roles'] = [role_ids[1]] + + return user_ids, project_ids, role_ids, project_tokens + + def _matches(event, token_values): """See if the token matches the revocation event. @@ -116,14 +153,22 @@ def _matches(event, token_values): class RevokeTests(object): def _assertTokenRevoked(self, events, token_data): + backend = sql.Revoke() + if events: + self.assertTrue(revoke_model.is_revoked(events, token_data), + 'Token should be revoked') return self.assertTrue( - revoke_model.is_revoked(events, token_data), - 'Token should be revoked') + revoke_model.is_revoked(backend.list_events(token=token_data), + token_data), 'Token should be revoked') def _assertTokenNotRevoked(self, events, token_data): + backend = sql.Revoke() + if events: + self.assertTrue(revoke_model.is_revoked(events, token_data), + 'Token should be revoked') return self.assertFalse( - revoke_model.is_revoked(events, token_data), - 'Token should not be revoked') + revoke_model.is_revoked(backend.list_events(token=token_data), + token_data), 'Token should not be revoked') def test_list(self): self.revoke_api.revoke_by_user(user_id=1) @@ -339,6 +384,108 @@ class RevokeTests(object): self.assertEqual( 1, len(revocation_backend.list_events(token=fourth_token))) + def _user_field_test(self, field_name): + token = _sample_blank_token() + token[field_name] = uuid.uuid4().hex + self.revoke_api.revoke_by_user(user_id=token[field_name]) + self._assertTokenRevoked(None, token) + token2 = _sample_blank_token() + token2[field_name] = uuid.uuid4().hex + self._assertTokenNotRevoked(None, token2) + + def test_revoke_by_user(self): + self._user_field_test('user_id') + + def test_revoke_by_user_matches_trustee(self): + self._user_field_test('trustee_id') + + def test_revoke_by_user_matches_trustor(self): + self._user_field_test('trustor_id') + + def test_revoke_by_audit_id(self): + token = _sample_blank_token() + # Audit ID and Audit Chain ID are populated with the same value + # if the token is an original token + token['audit_id'] = uuid.uuid4().hex + token['audit_chain_id'] = token['audit_id'] + self.revoke_api.revoke_by_audit_id(audit_id=token['audit_id']) + self._assertTokenRevoked(None, token) + + token2 = _sample_blank_token() + token2['audit_id'] = uuid.uuid4().hex + token2['audit_chain_id'] = token2['audit_id'] + self._assertTokenNotRevoked(None, token2) + + def test_by_project_grant(self): + user_ids, project_ids, role_ids, project_tokens = _sample_data() + token1 = _sample_blank_token() + token1['roles'] = role_ids[0] + token1['user_id'] = user_ids[0] + token1['project_id'] = project_ids[0] + + token2 = _sample_blank_token() + token2['roles'] = role_ids[1] + token2['user_id'] = user_ids[1] + token2['project_id'] = project_ids[1] + + token3 = _sample_blank_token() + token3['roles'] = [role_ids[0], + role_ids[1], + role_ids[2]] + token3['user_id'] = user_ids[2] + token3['project_id'] = project_ids[2] + + # Check that all tokens are revoked at the start + self._assertTokenNotRevoked(None, token1) + self._assertTokenNotRevoked(None, token2) + self._assertTokenNotRevoked(None, token3) + for token in project_tokens: + self._assertTokenNotRevoked(None, token) + + self.revoke_api.revoke_by_grant(role_id=role_ids[0], + user_id=user_ids[0], + project_id=project_ids[0]) + + # Only the first token should be revoked + self._assertTokenRevoked(None, token1) + self._assertTokenNotRevoked(None, token2) + self._assertTokenNotRevoked(None, token3) + for token in project_tokens: + self._assertTokenNotRevoked(None, token) + + self.revoke_api.revoke_by_grant(role_id=role_ids[1], + user_id=user_ids[1], + project_id=project_ids[1]) + + # Tokens 1 and 2 should be revoked now + self._assertTokenRevoked(None, token1) + self._assertTokenRevoked(None, token2) + self._assertTokenNotRevoked(None, token3) + for token in project_tokens: + self._assertTokenNotRevoked(None, token) + + # test that multiple roles with a single user and project get revoked + # and invalidate token3 + self.revoke_api.revoke_by_grant(role_id=role_ids[0], + user_id=user_ids[2], + project_id=project_ids[2]) + + self.revoke_api.revoke_by_grant(role_id=role_ids[1], + user_id=user_ids[2], + project_id=project_ids[2]) + + self.revoke_api.revoke_by_grant(role_id=role_ids[2], + user_id=user_ids[2], + project_id=project_ids[2]) + + # Tokens 1, 2, and 3 should now be revoked leaving project_tokens + # unrevoked. + self._assertTokenRevoked(None, token1) + self._assertTokenRevoked(None, token2) + self._assertTokenRevoked(None, token3) + for token in project_tokens: + self._assertTokenNotRevoked(None, token) + @mock.patch.object(timeutils, 'utcnow') def test_expired_events_are_removed(self, mock_utcnow): def _sample_token_values(): @@ -413,46 +560,6 @@ class RevokeListTests(unit.TestCase): super(RevokeListTests, self).setUp() self.events = [] self.revoke_events = list() - self._sample_data() - - def _sample_data(self): - user_ids = [] - project_ids = [] - role_ids = [] - for i in range(0, 3): - user_ids.append(uuid.uuid4().hex) - project_ids.append(uuid.uuid4().hex) - role_ids.append(uuid.uuid4().hex) - - project_tokens = [] - i = len(project_tokens) - project_tokens.append(_sample_blank_token()) - project_tokens[i]['user_id'] = user_ids[0] - project_tokens[i]['project_id'] = project_ids[0] - project_tokens[i]['roles'] = [role_ids[1]] - - i = len(project_tokens) - project_tokens.append(_sample_blank_token()) - project_tokens[i]['user_id'] = user_ids[1] - project_tokens[i]['project_id'] = project_ids[0] - project_tokens[i]['roles'] = [role_ids[0]] - - i = len(project_tokens) - project_tokens.append(_sample_blank_token()) - project_tokens[i]['user_id'] = user_ids[0] - project_tokens[i]['project_id'] = project_ids[1] - project_tokens[i]['roles'] = [role_ids[0]] - - token_to_revoke = _sample_blank_token() - token_to_revoke['user_id'] = user_ids[0] - token_to_revoke['project_id'] = project_ids[0] - token_to_revoke['roles'] = [role_ids[0]] - - self.project_tokens = project_tokens - self.user_ids = user_ids - self.project_ids = project_ids - self.role_ids = role_ids - self.token_to_revoke = token_to_revoke def _assertTokenRevoked(self, token_data): self.assertTrue(any([_matches(e, token_data) for e in self.events])) @@ -471,13 +578,6 @@ class RevokeListTests(unit.TestCase): self.revoke_events, revoke_model.RevokeEvent(user_id=user_id)) - def _revoke_by_audit_id(self, audit_id): - event = add_event( - self.revoke_events, - revoke_model.RevokeEvent(audit_id=audit_id)) - self.events.append(event) - return event - def _revoke_by_audit_chain_id(self, audit_chain_id, project_id=None, domain_id=None): event = add_event( @@ -500,17 +600,6 @@ class RevokeListTests(unit.TestCase): self.events.append(event) return event - def _revoke_by_grant(self, role_id, user_id=None, - domain_id=None, project_id=None): - event = add_event( - self.revoke_events, - revoke_model.RevokeEvent(user_id=user_id, - role_id=role_id, - domain_id=domain_id, - project_id=project_id)) - self.events.append(event) - return event - def _revoke_by_user_and_project(self, user_id, project_id): event = add_event(self.revoke_events, revoke_model.RevokeEvent(project_id=project_id, @@ -537,48 +626,6 @@ class RevokeListTests(unit.TestCase): revoke_model.RevokeEvent(domain_id=domain_id)) self.events.append(event) - def _user_field_test(self, field_name): - user_id = uuid.uuid4().hex - event = self._revoke_by_user(user_id) - self.events.append(event) - token_data_u1 = _sample_blank_token() - token_data_u1[field_name] = user_id - self._assertTokenRevoked(token_data_u1) - token_data_u2 = _sample_blank_token() - token_data_u2[field_name] = uuid.uuid4().hex - self._assertTokenNotRevoked(token_data_u2) - remove_event(self.revoke_events, event) - self.events.remove(event) - self._assertTokenNotRevoked(token_data_u1) - - def test_revoke_by_user(self): - self._user_field_test('user_id') - - def test_revoke_by_user_matches_trustee(self): - self._user_field_test('trustee_id') - - def test_revoke_by_user_matches_trustor(self): - self._user_field_test('trustor_id') - - def test_revoke_by_audit_id(self): - audit_id = common.build_audit_info(parent_audit_id=None)[0] - token_data_1 = _sample_blank_token() - # Audit ID and Audit Chain ID are populated with the same value - # if the token is an original token - token_data_1['audit_id'] = audit_id - token_data_1['audit_chain_id'] = audit_id - event = self._revoke_by_audit_id(audit_id) - self._assertTokenRevoked(token_data_1) - - audit_id_2 = common.build_audit_info(parent_audit_id=audit_id)[0] - token_data_2 = _sample_blank_token() - token_data_2['audit_id'] = audit_id_2 - token_data_2['audit_chain_id'] = audit_id - self._assertTokenNotRevoked(token_data_2) - - self.remove_event(event) - self._assertTokenNotRevoked(token_data_1) - def test_revoke_by_audit_chain_id(self): audit_id = common.build_audit_info(parent_audit_id=None)[0] token_data_1 = _sample_blank_token() @@ -603,57 +650,6 @@ class RevokeListTests(unit.TestCase): self.events.remove(event) remove_event(self.revoke_events, event) - def test_by_project_grant(self): - token_to_revoke = self.token_to_revoke - tokens = self.project_tokens - - self._assertTokenNotRevoked(token_to_revoke) - for token in tokens: - self._assertTokenNotRevoked(token) - - event = self._revoke_by_grant(role_id=self.role_ids[0], - user_id=self.user_ids[0], - project_id=self.project_ids[0]) - - self._assertTokenRevoked(token_to_revoke) - for token in tokens: - self._assertTokenNotRevoked(token) - - self.remove_event(event) - - self._assertTokenNotRevoked(token_to_revoke) - for token in tokens: - self._assertTokenNotRevoked(token) - - token_to_revoke['roles'] = [self.role_ids[0], - self.role_ids[1], - self.role_ids[2]] - - event = self._revoke_by_grant(role_id=self.role_ids[0], - user_id=self.user_ids[0], - project_id=self.project_ids[0]) - self._assertTokenRevoked(token_to_revoke) - self.remove_event(event) - self._assertTokenNotRevoked(token_to_revoke) - - event = self._revoke_by_grant(role_id=self.role_ids[1], - user_id=self.user_ids[0], - project_id=self.project_ids[0]) - self._assertTokenRevoked(token_to_revoke) - self.remove_event(event) - self._assertTokenNotRevoked(token_to_revoke) - - self._revoke_by_grant(role_id=self.role_ids[0], - user_id=self.user_ids[0], - project_id=self.project_ids[0]) - self._revoke_by_grant(role_id=self.role_ids[1], - user_id=self.user_ids[0], - project_id=self.project_ids[0]) - self._revoke_by_grant(role_id=self.role_ids[2], - user_id=self.user_ids[0], - project_id=self.project_ids[0]) - self._assertTokenRevoked(token_to_revoke) - def test_by_project_and_user_and_role(self): user_id1 = uuid.uuid4().hex user_id2 = uuid.uuid4().hex