Tokens API additional models

1. Added a test data file and more unit tests for the tokens api access model.
2. Completed fleshing up all models included in the tokens api access model.
3. Finished testing the tokens api access model.
4. Renamed the tests folder to metatests.
5. Completed fleshing up all tokens api response models.
6. Fixed code review issues.
7. Refactored reponses models and added a new test in access_test.py.
8. Rebased changes and uploaded a new patch set.
9. Fixed reviewer comment - made access.json in a more readable format.

Change-Id: Ifb5c03a4de622dc0ed29485fc26e354632c777c3
This commit is contained in:
Charles Kimpolo
2013-05-24 14:50:13 -05:00
parent 745e1c86cd
commit d8491c765e
8 changed files with 468 additions and 206 deletions

View File

@@ -34,6 +34,10 @@ class BaseIdentityModel(AutoMarshallingModel):
cls._remove_namespace(element, V2_0Constants.XML_NS_OPENSTACK_COMMON)
cls._remove_namespace(element, V2_0Constants.XML_NS_ATOM)
@classmethod
def _remove_namespace(cls, element, XML_NS):
raise NotImplementedError
class BaseIdentityListModel(AutoMarshallingListModel):

View File

@@ -14,46 +14,43 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
import json
from cloudcafe.identity.v2_0.tokens_api.models.base import \
BaseIdentityModel, BaseIdentityListModel
class Access(BaseIdentityModel):
TAG = 'access'
def __init__(self):
self.metadata = {}
self.service_catalog = ServiceCatalog()
self.user = User()
self.token = Token()
def __init__(self, metadata=None, service_catalog=None,
user=None, token=None):
super(Access, self).__init__()
self.metadata = metadata
self.service_catalog = service_catalog
self.user = user
self.token = token
def get_service(self, name):
for service in self.service_catalog:
if service.name == name:
return service
return None
@classmethod
def _json_to_obj(cls, serialized_str):
json_dict = json.loads(serialized_str)
return cls._dict_to_obj(json_dict.get(cls.TAG))
return cls._dict_to_obj(json_dict.get('access'))
@classmethod
def _dict_to_obj(cls, json_dict):
access = Access()
access.metadata = json_dict.get('metadata')
access.service_catalog = ServiceCatalog._list_to_obj(
json_dict.get(ServiceCatalog.TAG))
access.user = User._dict_to_obj(json_dict.get(User.TAG))
access.token = Token._dict_to_obj(json_dict.get(Token.TAG))
access = Access(metadata=json_dict.get('metadata'),
service_catalog=ServiceCatalog._list_to_obj(
json_dict.get('serviceCatalog')),
user=User._dict_to_obj(json_dict.get('user')),
token=Token._dict_to_obj(json_dict.get('token')))
return access
class ServiceCatalog(BaseIdentityListModel):
TAG = 'serviceCatalog'
def __init__(self, services=None):
super(ServiceCatalog, self).__init__()
self.extend(services or [])
@classmethod
def _list_to_obj(cls, service_dict_list):
@@ -61,41 +58,48 @@ class ServiceCatalog(BaseIdentityListModel):
for service_dict in service_dict_list:
service = Service._dict_to_obj(service_dict)
service_catalog.append(service)
return service_catalog
class Service(BaseIdentityModel):
def __init__(self):
self.endpoints = EndpointList()
self.endpoint_links = []
self.name = None
self.type = None
def __init__(self, endpoints=None, endpoint_links=None,
name=None, type_=None):
"""
@param endpoints:
@type: EndpointList
"""
super(Service, self).__init__()
self.endpoints = endpoints
self.endpoint_links = endpoint_links
self.name = name
self.type_ = type_
def get_endpoint(self, region):
'''Returns the endpoint that matches the provided region,
or None if such an endpoint is not found
'''
for ep in self.endpoints:
if getattr(ep, 'region'):
if str(ep.region).lower() == str(region).lower():
return ep
"""
Returns the endpoint that matches the provided region,
or None if such an endpoint is not found
"""
for endpoint in self.endpoints:
if getattr(endpoint, 'region'):
if str(endpoint.region).lower() == str(region).lower():
return endpoint
@classmethod
def _dict_to_obj(cls, json_dict):
service = Service()
service.endpoints = EndpointList._list_to_obj(
json_dict.get(EndpointList.TAG))
json_dict.get('endpoints'))
service.endpoint_links = json_dict.get('endpoints_links')
service.name = json_dict.get('name')
service.type = json_dict.get('type')
service.type_ = json_dict.get('type')
return service
class EndpointList(BaseIdentityListModel):
TAG = 'endpoints'
def __init__(self, endpoints=None):
super(EndpointList, self).__init__()
self.extend(endpoints or [])
@classmethod
def _list_to_obj(cls, endpoint_dict_list):
@@ -103,106 +107,114 @@ class EndpointList(BaseIdentityListModel):
for endpoint_dict in endpoint_dict_list:
endpoint = Endpoint._dict_to_obj(endpoint_dict)
endpoint_list.append(endpoint)
return endpoint_list
class Endpoint(BaseIdentityModel):
def __init__(self, admin_url, internal_url, public_url,
region, id):
def __init__(self, id_=None, admin_url=None,
internal_url=None, public_url=None, region=None):
super(Endpoint, self).__init__()
self.admin_url = admin_url
self.internal_url = internal_url
self.public_url = public_url
self.region = region
self.id_ = id
self.id_ = id_
@classmethod
def _dict_to_obj(cls, json_dict):
endpoint = Endpoint(json_dict.get('adminURL'),
json_dict.get('internalURL'),
json_dict.get('publicURL'),
json_dict.get('region'),
json_dict.get('id'))
endpoint = Endpoint(admin_url=json_dict.get('adminURL'),
internal_url=json_dict.get('internalURL'),
public_url=json_dict.get('publicURL'),
region=json_dict.get('region'),
id_=json_dict.get('id'))
return endpoint
class Token(BaseIdentityModel):
TAG = 'token'
def __init__(self):
self.expires = None
self.issued_at = None
self.id_ = None
self.tenant = Tenant()
def __init__(self, id_=None, issued_at=None,
expires=None, tenant=None):
super(Token, self).__init__()
self.expires = expires
self.issued_at = issued_at
self.id_ = id_
self.tenant = tenant
@classmethod
def _dict_to_obj(cls, json_dict):
token_model = Token()
token_model.tenant = Tenant._dict_to_obj(json_dict.get('tenant'))
token_model.expires = json_dict.get('expires')
token_model.issued_at = json_dict.get('issued_at')
token_model.id_ = json_dict.get('id')
return token_model
token = Token(tenant=Tenant._dict_to_obj(json_dict.get('tenant')),
expires=json_dict.get('expires'),
issued_at=json_dict.get('issued_at'),
id_=json_dict.get('id'))
return token
class Tenant(BaseIdentityModel):
TAG = 'tenant'
def __init__(self):
self.description = None
self.enabled = None
self.id_ = None
self.name = None
def __init__(self, id_=None, name=None,
enabled=None, description=None):
super(Tenant, self).__init__()
self.id_ = id_
self.name = name
self.enabled = enabled
self.description = description
@classmethod
def _dict_to_obj(cls, json_dict):
tenant = Tenant()
tenant.description = json_dict.get('description')
tenant.enabled = json_dict.get('enabled')
tenant.id_ = json_dict.get('id')
tenant.name = json_dict.get('name')
tenant = Tenant(id_=json_dict.get('id'),
name=json_dict.get('name'),
enabled=json_dict.get('enabled'),
description=json_dict.get('description'))
return tenant
class User(BaseIdentityModel):
TAG = 'user'
def __init__(self):
self.id_ = None
self.name = None
self.roles = RoleList()
self.role_links = []
self.username = None
def __init__(self, id_=None, name=None,
username=None, roles=None, roles_links=None):
"""
@param id_
@return:
@param roles
@type RoleList
"""
super(User, self).__init__()
self.id_ = id_
self.name = name
self.username = username
self.roles = roles
self.roles_links = roles_links
@classmethod
def _dict_to_obj(cls, json_dict):
user = User()
user.id_ = json_dict.get('id')
user.name = json_dict.get('name')
user.roles = RoleList._list_to_obj(json_dict.get(RoleList.TAG))
user.role_links = json_dict.get('role_links')
user.username = json_dict.get('username')
user = User(id_=json_dict.get('id'),
name=json_dict.get('name'),
roles=RoleList._list_to_obj(json_dict.get('roles')),
roles_links=json_dict.get('roles_links'),
username=json_dict.get('username'))
return user
class RoleList(BaseIdentityListModel):
TAG = 'roles'
def __init__(self, roles=None):
super(RoleList, self).__init__()
self.extend(roles or [])
@classmethod
def _list_to_obj(cls, role_dict_list):
role_list = RoleList()
for role_dict in role_dict_list:
role = Role(name=role_dict.get('name'))
role = Role._dict_to_obj(role_dict)
role_list.append(role)
return role_list
class Role(BaseIdentityListModel):
def __init__(self, name=None):
class Role(BaseIdentityModel):
def __init__(self, id_=None, name=None):
super(Role, self).__init__()
self.id_ = id_
self.name = name
@classmethod
def _dict_to_obj(cls, role_dict):
role = Role(name=role_dict.get('name'),
id_=role_dict.get('id'))
return role

View File

@@ -21,78 +21,66 @@ from cloudcafe.identity.v2_0.tokens_api.models.base import \
class Endpoints(BaseIdentityListModel):
ROOT_TAG = 'endpoints'
def __init__(self, endpoints=None):
super(Endpoints, self).__init__()
self.extend(endpoints)
self.extend(endpoints or [])
@classmethod
def _json_to_obj(cls, serialized_str):
json_dict = json.loads(serialized_str)
return cls._list_to_obj(json_dict.get(cls.ROOT_TAG))
return cls._list_to_obj(json_dict.get('endpoints'))
@classmethod
def _list_to_obj(cls, list_):
ret = {cls.ROOT_TAG: [Endpoint(**endpoint) for endpoint in list_]}
ret = {'endpoints': [Endpoint(**endpoint) for endpoint in list_]}
return Endpoints(**ret)
@classmethod
def _xml_to_obj(cls, serialized_str):
element = ElementTree.fromstring(serialized_str)
cls._remove_identity_xml_namespaces(element)
if element.tag != cls.ROOT_TAG:
if element.tag != 'endpoints':
return None
return cls._xml_list_to_obj(element.findall(Endpoint.ROOT_TAG))
return cls._xml_list_to_obj(element.findall('endpoint'))
@classmethod
def _xml_list_to_obj(cls, xml_list):
kwargs = {cls.ROOT_TAG: [Endpoint._xml_ele_to_obj(endpoint)
for endpoint in xml_list]}
kwargs = {'endpoints': [Endpoint._xml_ele_to_obj(endpoint)
for endpoint in xml_list]}
return Endpoints(**kwargs)
class Endpoint(BaseIdentityModel):
ROOT_TAG = 'endpoint'
def __init__(self, tenantId=None, region=None, id=None, publicURL=None,
name=None, adminURL=None, type=None, internalURL=None,
versionId=None, versionInfo=None, versionList=None):
# version=None
def __init__(self, tenant_id=None, region=None, id_=None, public_url=None,
name=None, admin_url=None, type_=None, internal_url=None):
super(Endpoint, self).__init__()
self.tenantId = tenantId
self.tenant_id = tenant_id
self.region = region
self.id = id
self.publicURL = publicURL
self.id_ = id_
self.public_url = public_url
self.name = name
self.adminURL = adminURL
self.type = type
self.internalURL = internalURL
self.versionId = versionId
self.versionInfo = versionInfo
self.versionList = versionList
self.admin_url = admin_url
self.type_ = type_
self.internal_url = internal_url
#currently json has version attributes as part of the Endpoint
#xml has it as a seprate element.
# self.version = version
#xml has it as a separate element.
@classmethod
def _json_to_obj(cls, serialized_str):
json_dict = json.loads(serialized_str)
return cls._dict_to_obj(json_dict.get(cls.ROOT_TAG))
return cls._dict_to_obj(json_dict.get('endpoint'))
@classmethod
def _dict_to_obj(cls, dic):
if Version.ROOT_TAG in dic:
dic[Version.ROOT_TAG] = Version(**dic[Version.ROOT_TAG])
if 'version' in dic:
dic['version'] = Version(**dic['version'])
return Endpoint(**dic)
@classmethod
def _xml_to_obj(cls, serialized_str):
element = ElementTree.fromstring(serialized_str)
cls._remove_identity_xml_namespaces(element)
if element.tag != cls.ROOT_TAG:
if element.tag != 'endpoint':
return None
return cls._xml_ele_to_obj(element)
@@ -109,7 +97,7 @@ class Endpoint(BaseIdentityModel):
kwargs['id'] = int(xml_ele.get('id'))
except (ValueError, TypeError):
kwargs['id'] = xml_ele.get('id')
version = xml_ele.find(Version.ROOT_TAG)
version = xml_ele.find('version')
if version is not None:
kwargs['versionId'] = version.get('id')
kwargs['versionInfo'] = version.get('info')
@@ -118,13 +106,50 @@ class Endpoint(BaseIdentityModel):
class Version(BaseIdentityModel):
ROOT_TAG = 'version'
def __init__(self, id=None, info=None, list=None):
self.id = id
self.info = info
self.list = list
def __init__(self, status=None, updated=None, media_types=None,
id_=None, links=None):
super(Version, self).__init__()
self.status = status
self.updated = updated
self.media_types = media_types
self.id_ = id_
self.links = links
class MediaTypes(BaseIdentityListModel):
def __init__(self, media_types=None):
"""
An object that represents a mediatypes response object.
"""
super(MediaTypes, self).__init__()
self.extend(media_types or [])
class MediaType(BaseIdentityModel):
def __init__(self, base=None, type_=None):
"""
An object that represents a mediatype response object.
"""
super(MediaType, self).__init__()
self.base = base
self.type_ = type_
class Links(BaseIdentityListModel):
def __init__(self, links=None):
"""
An object that represents a links response object.
"""
super(Links, self).__init__()
self.extend(links or [])
class Link(BaseIdentityModel):
def __init__(self, href=None, type_=None, rel=None):
"""
An object that represents a link response object.
"""
super(Link, self).__init__()
self.href = href
self.type_ = type_
self.rel = rel

View File

@@ -21,77 +21,62 @@ from cloudcafe.identity.v2_0.tokens_api.models.base import \
class Roles(BaseIdentityListModel):
ROOT_TAG = 'roles'
def __init__(self, roles=None):
'''An object that represents an users response object.
"""
An object that represents a roles response object.
Keyword arguments:
'''
"""
super(Roles, self).__init__()
self.extend(roles)
self.extend(roles or [])
@classmethod
def _json_to_obj(cls, serialized_str):
json_dict = json.loads(serialized_str)
return cls._list_to_obj(json_dict.get(cls.ROOT_TAG))
return cls._list_to_obj(json_dict.get('roles'))
@classmethod
def _list_to_obj(cls, list_):
ret = {cls.ROOT_TAG: [Role(**role) for role in list_]}
ret = {'roles': [Role(**role) for role in list_]}
return Roles(**ret)
@classmethod
def _xml_to_obj(cls, serialized_str):
element = ElementTree.fromstring(serialized_str)
cls._remove_identity_xml_namespaces(element)
if element.tag != cls.ROOT_TAG:
if element.tag != 'roles':
return None
return cls._xml_list_to_obj(element.findall(Role.ROOT_TAG))
return cls._xml_list_to_obj(element.findall('role'))
@classmethod
def _xml_list_to_obj(cls, xml_list):
kwargs = {cls.ROOT_TAG: [Role._xml_ele_to_obj(role)
for role in xml_list]}
kwargs = {'roles': [Role._xml_ele_to_obj(role) for role in xml_list]}
return Roles(**kwargs)
class Role(BaseIdentityModel):
ROOT_TAG = 'role'
def __init__(self, id=None, name=None, description=None, serviceId=None,
tenantId=None, propagate = None, weight = None):
def __init__(self, id_=None, name=None, description=None):
super(Role, self).__init__()
self.id = id
self.id_ = id_
self.name = name
self.description = description
self.serviceId = serviceId
self.tenantId = tenantId
self.weight = weight
self.propagate = propagate
@classmethod
def _json_to_obj(cls, serialized_str):
json_dict = json.loads(serialized_str)
json_dict['role']['propagate'] = json_dict['role'].pop('RAX-AUTH:propagate')
json_dict['role']['weight'] = json_dict['role'].pop('RAX-AUTH:Weight')
return Role(**json_dict.get(cls.ROOT_TAG))
return Role(**json_dict.get('role'))
@classmethod
def _xml_to_obj(cls, serialized_str):
element = ElementTree.fromstring(serialized_str)
cls._remove_identity_xml_namespaces(element)
if element.tag != cls.ROOT_TAG:
if element.tag != 'role':
return None
return cls._xml_ele_to_obj(element)
@classmethod
def _xml_ele_to_obj(cls, xml_ele):
kwargs = {'name': xml_ele.get('name'),
'description': xml_ele.get('description'),
'serviceId': xml_ele.get('serviceId'),
'tenantId': xml_ele.get('tenantId')}
'description': xml_ele.get('description')}
try:
kwargs['id'] = int(xml_ele.get('id'))
except (ValueError, TypeError):

View File

@@ -28,7 +28,7 @@ class Tenants(BaseIdentityListModel):
'''An object that represents an tenants response object.
'''
super(Tenants, self).__init__()
self.extend(tenants)
self.extend(tenants or [])
@classmethod
def _json_to_obj(cls, serialized_str):

View File

@@ -18,80 +18,74 @@ import json
from xml.etree import ElementTree
from cloudcafe.identity.v2_0.tokens_api.models.base import \
BaseIdentityModel, BaseIdentityListModel
from cloudcafe.identity.v2_0.tokens_api.models.responses.role import Roles, Role
from cloudcafe.identity.v2_0.tokens_api.models.responses.role import Roles
class Users(BaseIdentityListModel):
ROOT_TAG = 'users'
def __init__(self, users=None):
super(Users, self).__init__()
self.extend(users)
self.extend(users or [])
@classmethod
def _json_to_obj(cls, serialized_str):
ret = json.loads(serialized_str)
ret[cls.ROOT_TAG] = [User._dict_to_obj(user)
for user in ret.get(cls.ROOT_TAG)]
ret['users'] = [User._dict_to_obj(user) for user in ret.get('users')]
return Users(**ret)
@classmethod
def _xml_to_obj(cls, serialized_str):
element = ElementTree.fromstring(serialized_str)
cls._remove_identity_xml_namespaces(element)
if element.tag != cls.ROOT_TAG:
if element.tag != 'users':
return None
return cls._xml_list_to_obj(element.findall(User.ROOT_TAG))
return cls._xml_list_to_obj(element.findall('user'))
@classmethod
def _xml_list_to_obj(cls, xml_list):
kwargs = {cls.ROOT_TAG: [User._xml_ele_to_obj(ele)
for ele in xml_list]}
kwargs = {'users': [User._xml_ele_to_obj(ele) for ele in xml_list]}
return Users(**kwargs)
class User(BaseIdentityModel):
ROOT_TAG = 'user'
def __init__(self, id=None, enabled=None, username=None, updated=None,
created=None, email=None, domainId=None, defaultRegion=None,
password=None, roles=None, name=None, display_name=None):
'''An object that represents an users response object.
def __init__(self, id_=None, name=None, tenant_id=None,
enabled=None, email=None, roles=None):
"""
An object that represents an users response object.
Keyword arguments:
'''
@param id_:
@param name:
@param tenant_id:
@param enabled:
@param email:
@param roles:
"""
super(User, self).__init__()
self.id = id
self.enabled = enabled
self.username = username
self.updated = updated
self.created = created
self.email = email
self.domainId = domainId
self.defaultRegion = defaultRegion
self.password = password
self.roles = roles
self.id_ = id_
self.name = name
self.display_name = display_name
self.tenant_id = tenant_id
self.enabled = enabled
self.email = email
self.roles = roles
def get_role(self, id=None, name=None):
'''Returns the role object if it matches all provided criteria'''
def get_role(self, id_=None, name=None):
"""Returns the role object if it matches all provided criteria
"""
for role in self.roles:
if id and not name:
if role.id == id:
if id_ and not name:
if role.id_ == id_:
return role
if name and not id:
if name and not id_:
if role.name == name:
return role
if name and id:
if (role.name == name) and (role.id == id):
if name and id_:
if (role.name == name) and (role.id_ == id_):
return role
@classmethod
def _json_to_obj(cls, serialized_str):
json_dict = json.loads(serialized_str)
user = cls._dict_to_obj(json_dict.get(cls.ROOT_TAG))
user = cls._dict_to_obj(json_dict.get('user'))
return user
@classmethod
@@ -109,16 +103,16 @@ class User(BaseIdentityModel):
if 'display-name' in json_dict:
json_dict['display_name'] = json_dict['display-name']
del json_dict['display-name']
if Roles.ROOT_TAG in json_dict:
json_dict[Roles.ROOT_TAG] = Roles.\
_list_to_obj(json_dict[Roles.ROOT_TAG])
if 'roles' in json_dict:
json_dict['roles'] = Roles.\
_list_to_obj(json_dict['roles'])
return User(**json_dict)
@classmethod
def _xml_to_obj(cls, serialized_str):
element = ElementTree.fromstring(serialized_str)
cls._remove_identity_xml_namespaces(element)
if element.tag != cls.ROOT_TAG:
if element.tag != 'user':
return None
return cls._xml_ele_to_obj(element)
@@ -139,11 +133,11 @@ class User(BaseIdentityModel):
kwargs['id'] = xml_ele.get('id')
if xml_ele.get('enabled') is not None:
kwargs['enabled'] = json.loads(xml_ele.get('enabled').lower())
roles = xml_ele.find(Roles.ROOT_TAG)
roles = xml_ele.find('roles')
if roles is not None:
#if roles is not a list it is a single element with a list of
#role elements
roles = roles.findall(Role.ROOT_TAG)
roles = roles.findall('role')
if roles is not None:
kwargs['roles'] = Roles._xml_list_to_obj(roles)
return User(**kwargs)