# Copyright (c) 2017 Ericsson AB. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Copyright (c) 2019 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # from dcdbsync.dbsyncclient import base from dcdbsync.dbsyncclient.base import get_json from dcdbsync.dbsyncclient import exceptions class RevokeEvent(base.Resource): resource_name = 'token_revoke_event' def __init__(self, manager, id, domain_id, project_id, user_id, role_id, trust_id, consumer_id, access_token_id, issued_before, expires_at, revoked_at, audit_id, audit_chain_id): self.manager = manager self.id = id self.domain_id = domain_id self.project_id = project_id self.user_id = user_id self.role_id = role_id self.trust_id = trust_id self.consumer_id = consumer_id self.access_token_id = access_token_id self.issued_before = issued_before self.expires_at = expires_at self.revoked_at = revoked_at self.audit_id = audit_id self.audit_chain_id = audit_chain_id def info(self): resource_info = dict() resource_info.update({self.resource_name: {'id': self.id, 'project_id': self.project_id, 'user_id': self.user_id, 'role_id': self.role_id, 'audit_id': self.audit_id, 'issued_before': self.issued_before}}) return resource_info class revoke_event_manager(base.ResourceManager): resource_class = RevokeEvent def revoke_event_create(self, url, data): resp = self.http_client.post(url, data) # Unauthorized if resp.status_code == 401: raise exceptions.Unauthorized('Unauthorized request') if resp.status_code != 201: self._raise_api_exception(resp) # Converted into python dict json_object = get_json(resp) return json_object def revoke_events_list(self, url): resp = self.http_client.get(url) # Unauthorized if resp.status_code == 401: raise exceptions.Unauthorized('Unauthorized request') if resp.status_code != 200: self._raise_api_exception(resp) # Converted into python dict json_objects = get_json(resp) revoke_events = [] for json_object in json_objects: json_object = json_object.get('revocation_event') revoke_event = RevokeEvent( self, id=json_object['id'], domain_id=json_object['domain_id'], project_id=json_object['project_id'], user_id=json_object['user_id'], role_id=json_object['role_id'], trust_id=json_object['trust_id'], consumer_id=json_object['consumer_id'], access_token_id=json_object['access_token_id'], issued_before=json_object['issued_before'], expires_at=json_object['expires_at'], revoked_at=json_object['revoked_at'], audit_id=json_object['audit_id'], audit_chain_id=json_object['audit_chain_id']) revoke_events.append(revoke_event) return revoke_events def _revoke_event_detail(self, url): resp = self.http_client.get(url) # Unauthorized if resp.status_code == 401: raise exceptions.Unauthorized('Unauthorized request') if resp.status_code != 200: self._raise_api_exception(resp) # Return revoke_event details in original json format, # ie, without convert it into python dict return resp.content def _revoke_event_delete(self, url): resp = self.http_client.delete(url) # Unauthorized if resp.status_code == 401: raise exceptions.Unauthorized('Unauthorized request') # NotFound if resp.status_code == 404: raise exceptions.NotFound('Requested item not found') if resp.status_code != 204: self._raise_api_exception(resp) def add_revoke_event(self, data): url = '/identity/token-revocation-events/' return self.revoke_event_create(url, data) def list_revoke_events(self): url = '/identity/token-revocation-events/' return self.revoke_events_list(url) def revoke_event_detail(self, user_id=None, audit_id=None): if user_id: url = '/identity/token-revocation-events/users/%s' % user_id elif audit_id: url = '/identity/token-revocation-events/audits/%s' % audit_id else: raise exceptions.\ IllegalArgumentException('Token revocation event user ID' ' or audit ID required.') return self._revoke_event_detail(url) def delete_revoke_event(self, user_id=None, audit_id=None): if user_id: url = '/identity/token-revocation-events/users/%s' % user_id elif audit_id: url = '/identity/token-revocation-events/audits/%s' % audit_id else: raise exceptions.\ IllegalArgumentException('Token revocation event ID' ' or audit ID required.') return self._revoke_event_delete(url)