Changes to allow additional calls to support endpoint template CRUD and additional checks on existing method.

Fixing issues that did not allow ldap to pass.

Change-Id: Ie98e50f8731105f24ebad2691a9e38d20edead28
This commit is contained in:
Yogeshwar Srikrishnan 2011-08-08 17:31:26 -05:00
parent 4c6ed61ed1
commit 1bfcb5c00c
13 changed files with 406 additions and 71 deletions

View File

@ -247,6 +247,9 @@ class BaseEndpointTemplateAPI(object):
def create(self, values):
raise NotImplementedError
def delete(self, id):
raise NotImplementedError
def get(self, id):
raise NotImplementedError
@ -282,7 +285,10 @@ class BaseEndpointTemplateAPI(object):
class BaseServiceAPI:
def create(self, values):
raise NotImplementedError
def delete(self, id):
raise NotImplementedError
def get(self, id):
raise NotImplementedError

View File

@ -44,7 +44,7 @@ UserRoleAssociation = create_model(
Endpoints = create_model(
'Endpoints', ['tenant_id', 'endpoint_template_id'])
Role = create_model(
'Role', ['id', 'desc'])
'Role', ['id', 'desc', 'service_id'])
Tenant = create_model(
'Tenant', ['id', 'desc', 'enabled'])
User = create_model(

View File

@ -26,6 +26,13 @@ class EndpointTemplateAPI(BaseEndpointTemplateAPI):
endpoint_template.save()
return endpoint_template
def delete(self, id, session=None):
if not session:
session = get_session()
with session.begin():
endpoint_template = self.get(id, session)
session.delete(endpoint_template)
def get(self, id, session=None):
if not session:
session = get_session()

View File

@ -92,7 +92,7 @@ class UserRoleAssociation(Base, KeystoneBase):
class Endpoints(Base, KeystoneBase):
__tablename__ = 'endpoints'
id = Column(Integer, primary_key=True)
tenant_id = Column(String(255), ForeignKey('tenants.id'))
tenant_id = Column(String(255))
endpoint_template_id = Column(Integer, ForeignKey('endpoint_templates.id'))
__table_args__ = (
UniqueConstraint("endpoint_template_id", "tenant_id"), {})
@ -122,7 +122,6 @@ class Tenant(Base, KeystoneBase):
id = Column(String(255), primary_key=True, unique=True)
desc = Column(String(255))
enabled = Column(Integer)
endpoints = relationship('Endpoints', backref='tenant', cascade="all")
class User(Base, KeystoneBase):

View File

@ -18,6 +18,21 @@ class EndpointTemplatesController(wsgi.Controller):
utils.get_auth_token(req), marker, limit, url)
return utils.send_result(200, req, endpoint_templates)
@utils.wrap_error
def add_endpoint_template(self, req):
endpoint_template = utils.\
get_normalized_request_content(EndpointTemplate, req)
return utils.send_result(201, req,
config.SERVICE.
add_endpoint_template(\
utils.get_auth_token(req), endpoint_template))
@utils.wrap_error
def delete_endpoint_template(self, req, endpoint_templates_id):
rval = config.SERVICE.delete_endpoint_template(
utils.get_auth_token(req), endpoint_templates_id)
return utils.send_result(204, req, rval)
@utils.wrap_error
def get_endpoint_template(self, req, endpoint_templates_id):
endpoint_template = config.SERVICE.get_endpoint_template(

View File

@ -215,7 +215,7 @@ class IdentityService(object):
def create_user(self, admin_token, user):
self.__validate_admin_token(admin_token)
dtenant = self.validate_and_fetch_user_tenant(user.tenant_id)
self.validate_and_fetch_user_tenant(user.tenant_id)
if not isinstance(user, User):
raise fault.BadRequestFault("Expecting a User")
@ -303,10 +303,6 @@ class IdentityService(object):
duser = api.USER.get(user_id)
if not duser:
raise fault.ItemNotFoundFault("The user could not be found")
dtenant = api.TENANT.get(duser.tenant_id)
ts = []
return User_Update(None, duser.id, duser.tenant_id,
duser.email, duser.enabled)
@ -384,7 +380,7 @@ class IdentityService(object):
if duser == None:
raise fault.ItemNotFoundFault("The user could not be found")
dtenant = self.validate_and_fetch_user_tenant(user.tenant_id)
self.validate_and_fetch_user_tenant(user.tenant_id)
values = {'tenant_id': user.tenant_id}
api.USER.update(user_id, values)
return User_Update(None,
@ -417,13 +413,13 @@ class IdentityService(object):
token = auth.Token(dtoken.expires, dtoken.id, dtoken.tenant_id)
ts = []
if dtoken.tenant_id:
droleRefs = api.ROLE.ref_get_all_tenant_roles(duser.id,
drole_refs = api.ROLE.ref_get_all_tenant_roles(duser.id,
dtoken.tenant_id)
for droleRef in droleRefs:
for droleRef in drole_refs:
ts.append(RoleRef(droleRef.id, droleRef.role_id,
droleRef.tenant_id))
droleRefs = api.ROLE.ref_get_all_global_roles(duser.id)
for droleRef in droleRefs:
drole_refs = api.ROLE.ref_get_all_global_roles(duser.id)
for droleRef in drole_refs:
ts.append(RoleRef(droleRef.id, droleRef.role_id,
droleRef.tenant_id))
user = auth.User(duser.id, duser.tenant_id, RoleRefs(ts, []))
@ -469,9 +465,9 @@ class IdentityService(object):
def __validate_admin_token(self, token_id):
(token, user) = self.__validate_token(token_id)
for roleRef in api.ROLE.ref_get_all_global_roles(user.id):
if roleRef.role_id == backends.KEYSTONEADMINROLE and \
roleRef.tenant_id is None:
for role_ref in api.ROLE.ref_get_all_global_roles(user.id):
if role_ref.role_id == backends.KEYSTONEADMINROLE and \
role_ref.tenant_id is None:
return (token, user)
raise fault.UnauthorizedFault(
@ -479,16 +475,16 @@ class IdentityService(object):
def __validate_service_or_keystone_admin_token(self, token_id):
(token, user) = self.__validate_token(token_id)
for roleRef in api.ROLE.ref_get_all_global_roles(user.id):
if (roleRef.role_id == backends.KEYSTONEADMINROLE or \
roleRef.role_id == backends.KEYSTONESERVICEADMINROLE) and \
roleRef.tenant_id is None:
for role_ref in api.ROLE.ref_get_all_global_roles(user.id):
if (role_ref.role_id == backends.KEYSTONEADMINROLE or \
role_ref.role_id == backends.KEYSTONESERVICEADMINROLE) and \
role_ref.tenant_id is None:
return (token, user)
raise fault.UnauthorizedFault(
"You are not authorized to make this call")
def create_role(self, admin_token, role):
self.__validate_admin_token(admin_token)
self.__validate_service_or_keystone_admin_token(admin_token)
if not isinstance(role, Role):
raise fault.BadRequestFault("Expecting a Role")
@ -499,20 +495,28 @@ class IdentityService(object):
if api.ROLE.get(role.role_id) != None:
raise fault.RoleConflictFault(
"A role with that id already exists")
#Check if the passed service exist.
if role.service_id != None and len(role.service_id.strip()) > 0 and\
api.SERVICE.get(role.service_id) == None:
raise fault.BadRequestFault(
"A service with that id doesnt exist.")
drole = models.Role()
drole.id = role.role_id
drole.desc = role.desc
drole.service_id = role.service_id
api.ROLE.create(drole)
return role
def get_roles(self, admin_token, marker, limit, url):
self.__validate_admin_token(admin_token)
self.__validate_service_or_keystone_admin_token(admin_token)
ts = []
droles = api.ROLE.get_page(marker, limit)
for drole in droles:
ts.append(Role(drole.id,
drole.desc))
drole.desc, drole.service_id))
prev, next = api.ROLE.get_page_markers(marker, limit)
links = []
if prev:
@ -524,68 +528,68 @@ class IdentityService(object):
return Roles(ts, links)
def get_role(self, admin_token, role_id):
self.__validate_admin_token(admin_token)
self.__validate_service_or_keystone_admin_token(admin_token)
drole = api.ROLE.get(role_id)
if not drole:
raise fault.ItemNotFoundFault("The role could not be found")
return Role(drole.id, drole.desc)
return Role(drole.id, drole.desc, drole.service_id)
def delete_role(self, admin_token, role_id):
self.__validate_admin_token(admin_token)
self.__validate_service_or_keystone_admin_token(admin_token)
drole = api.ROLE.get(role_id)
if not drole:
raise fault.ItemNotFoundFault("The role could not be found")
api.ROLE.delete(role_id)
def create_role_ref(self, admin_token, user_id, roleRef):
self.__validate_admin_token(admin_token)
def create_role_ref(self, admin_token, user_id, role_ref):
self.__validate_service_or_keystone_admin_token(admin_token)
duser = api.USER.get(user_id)
if not duser:
raise fault.ItemNotFoundFault("The user could not be found")
if not isinstance(roleRef, RoleRef):
if not isinstance(role_ref, RoleRef):
raise fault.BadRequestFault("Expecting a Role Ref")
if roleRef.role_id == None:
if role_ref.role_id == None:
raise fault.BadRequestFault("Expecting a Role Id")
drole = api.ROLE.get(roleRef.role_id)
drole = api.ROLE.get(role_ref.role_id)
if drole == None:
raise fault.ItemNotFoundFault("The role not found")
if roleRef.tenant_id != None:
dtenant = api.TENANT.get(roleRef.tenant_id)
if role_ref.tenant_id != None:
dtenant = api.TENANT.get(role_ref.tenant_id)
if dtenant == None:
raise fault.ItemNotFoundFault("The tenant not found")
drole_ref = models.UserRoleAssociation()
drole_ref.user_id = duser.id
drole_ref.role_id = drole.id
if roleRef.tenant_id != None:
if role_ref.tenant_id != None:
drole_ref.tenant_id = dtenant.id
user_role_ref = api.USER.user_role_add(drole_ref)
roleRef.role_ref_id = user_role_ref.id
return roleRef
role_ref.role_ref_id = user_role_ref.id
return role_ref
def delete_role_ref(self, admin_token, role_ref_id):
self.__validate_admin_token(admin_token)
self.__validate_service_or_keystone_admin_token(admin_token)
api.ROLE.ref_delete(role_ref_id)
return None
def get_user_roles(self, admin_token, marker, limit, url, user_id):
self.__validate_admin_token(admin_token)
self.__validate_service_or_keystone_admin_token(admin_token)
duser = api.USER.get(user_id)
if not duser:
raise fault.ItemNotFoundFault("The user could not be found")
ts = []
droleRefs = api.ROLE.ref_get_page(marker, limit, user_id)
for droleRef in droleRefs:
ts.append(RoleRef(droleRef.id, droleRef.role_id,
droleRef.tenant_id))
drole_refs = api.ROLE.ref_get_page(marker, limit, user_id)
for drole_ref in drole_refs:
ts.append(RoleRef(drole_ref.id, drole_ref.role_id,
drole_ref.tenant_id))
prev, next = api.ROLE.ref_get_page_markers(user_id, marker, limit)
links = []
if prev:
@ -596,8 +600,40 @@ class IdentityService(object):
% (url, next, limit)))
return RoleRefs(ts, links)
def add_endpoint_template(self, admin_token, endpoint_template):
self.__validate_service_or_keystone_admin_token(admin_token)
if not isinstance(endpoint_template, EndpointTemplate):
raise fault.BadRequestFault("Expecting a EndpointTemplate")
#Check if the passed service exist.
if endpoint_template.service != None and\
len(endpoint_template.service.strip()) > 0 and\
api.SERVICE.get(endpoint_template.service) == None:
raise fault.BadRequestFault(
"A service with that id doesnt exist.")
dendpoint_template = models.EndpointTemplates()
dendpoint_template.region = endpoint_template.region
dendpoint_template.service = endpoint_template.service
dendpoint_template.public_url = endpoint_template.public_url
dendpoint_template.admin_url = endpoint_template.admin_url
dendpoint_template.internal_url = endpoint_template.internal_url
dendpoint_template.enabled = endpoint_template.enabled
dendpoint_template.is_global = endpoint_template.is_global
dendpoint_template = api.ENDPOINT_TEMPLATE.create(dendpoint_template)
endpoint_template.id = dendpoint_template.id
return endpoint_template
def delete_endpoint_template(self, admin_token, endpoint_template_id):
self.__validate_service_or_keystone_admin_token(admin_token)
dendpoint_template = api.ENDPOINT_TEMPLATE.get(endpoint_template_id)
if not dendpoint_template:
raise fault.ItemNotFoundFault(
"The endpoint template could not be found")
api.ENDPOINT_TEMPLATE.delete(endpoint_template_id)
def get_endpoint_templates(self, admin_token, marker, limit, url):
self.__validate_admin_token(admin_token)
self.__validate_service_or_keystone_admin_token(admin_token)
ts = []
dendpointTemplates = api.ENDPOINT_TEMPLATE.get_page(marker, limit)
@ -622,7 +658,7 @@ class IdentityService(object):
return EndpointTemplates(ts, links)
def get_endpoint_template(self, admin_token, endpoint_template_id):
self.__validate_admin_token(admin_token)
self.__validate_service_or_keystone_admin_token(admin_token)
dendpointTemplate = api.ENDPOINT_TEMPLATE.get(endpoint_template_id)
if not dendpointTemplate:
@ -639,7 +675,7 @@ class IdentityService(object):
dendpointTemplate.is_global)
def get_tenant_endpoints(self, admin_token, marker, limit, url, tenant_id):
self.__validate_admin_token(admin_token)
self.__validate_service_or_keystone_admin_token(admin_token)
if tenant_id == None:
raise fault.BadRequestFault("Expecting a Tenant Id")
@ -671,7 +707,7 @@ class IdentityService(object):
def create_endpoint_for_tenant(self, admin_token,
tenant_id, endpoint_template, url):
self.__validate_admin_token(admin_token)
self.__validate_service_or_keystone_admin_token(admin_token)
if tenant_id == None:
raise fault.BadRequestFault("Expecting a Tenant Id")
if api.TENANT.get(tenant_id) == None:
@ -694,7 +730,7 @@ class IdentityService(object):
api.ENDPOINT_TEMPLATE.endpoint_delete(endpoint_id)
return None
#Service Operations
#Service Operations
def create_service(self, admin_token, service):
self.__validate_service_or_keystone_admin_token(admin_token)

View File

@ -73,8 +73,6 @@ class EndpointTemplate(object):
id = None
else:
id = endpoint_template["id"]
if id == None:
raise fault.BadRequestFault("Expecting endpointTemplate")
if 'region' in endpoint_template:
region = endpoint_template["region"]

View File

@ -80,7 +80,7 @@ class Role(object):
if self.desc:
dom.set("description", string.lower(str(self.desc)))
if self.service_id:
dom.set("serviceId", string.lower(str(self.service_id)))
dom.set("serviceId", str(self.service_id))
return dom
def to_xml(self):
@ -93,7 +93,7 @@ class Role(object):
if self.desc:
role["description"] = self.desc
if self.service_id:
role["serviceId"] = self.desc
role["serviceId"] = self.service_id
return {'role': role}
def to_json(self):
@ -156,17 +156,17 @@ class RoleRef(object):
obj = json.loads(json_str)
if not "roleRef" in obj:
raise fault.BadRequestFault("Expecting Role Ref")
roleRef = obj["roleRef"]
if not "roleId" in roleRef:
role_ref = obj["roleRef"]
if not "roleId" in role_ref:
role_id = None
else:
role_id = roleRef["roleId"]
role_id = role_ref["roleId"]
if role_id == None:
raise fault.BadRequestFault("Expecting Role")
if not "tenantId" in roleRef:
if not "tenantId" in role_ref:
tenant_id = None
else:
tenant_id = roleRef["tenantId"]
tenant_id = role_ref["tenantId"]
if tenant_id == None:
raise fault.BadRequestFault("Expecting Tenant")
return RoleRef('', role_id, tenant_id)
@ -188,14 +188,14 @@ class RoleRef(object):
return etree.tostring(self.to_dom())
def to_dict(self):
roleRef = {}
role_ref = {}
if self.role_ref_id:
roleRef["id"] = self.role_ref_id
role_ref["id"] = self.role_ref_id
if self.role_id:
roleRef["roleId"] = self.role_id
role_ref["roleId"] = self.role_id
if self.tenant_id:
roleRef["tenantId"] = self.tenant_id
return {'roleRef': roleRef}
role_ref["tenantId"] = self.tenant_id
return {'roleRef': role_ref}
def to_json(self):
return json.dumps(self.to_dict())

View File

@ -18,9 +18,7 @@ class AdminApi(wsgi.Router):
def __init__(self, options):
self.options = options
mapper = routes.Mapper()
db.configure_backends(options)
# Token Operations
auth_controller = AuthController(options)
mapper.connect("/tokens", controller=auth_controller,
@ -32,7 +30,6 @@ class AdminApi(wsgi.Router):
mapper.connect("/tokens/{token_id}", controller=auth_controller,
action="delete_token",
conditions=dict(method=["DELETE"]))
# Tenant Operations
tenant_controller = TenantController(options)
mapper.connect("/tenants", controller=tenant_controller,
@ -116,10 +113,18 @@ class AdminApi(wsgi.Router):
controller=endpoint_templates_controller,
action="get_endpoint_templates",
conditions=dict(method=["GET"]))
mapper.connect("/endpointTemplates",
controller=endpoint_templates_controller,
action="add_endpoint_template",
conditions=dict(method=["POST"]))
mapper.connect("/endpointTemplates/{endpoint_templates_id}",
controller=endpoint_templates_controller,
action="get_endpoint_template",
conditions=dict(method=["GET"]))
mapper.connect("/endpointTemplates/{endpoint_templates_id}",
controller=endpoint_templates_controller,
action="delete_endpoint_template",
conditions=dict(method=["DELETE"]))
mapper.connect("/tenants/{tenant_id}/endpoints",
controller=endpoint_templates_controller,
action="get_endpoints_for_tenant",
@ -165,10 +170,9 @@ class AdminApi(wsgi.Router):
action="get_services", conditions=dict(method=["GET"]))
mapper.connect("/services", controller=services_controller,
action="create_service", conditions=dict(method=["POST"]))
mapper.connect("/services/{service_id}",\
controller=services_controller,
action="delete_service",
conditions=dict(method=["DELETE"]))
mapper.connect("/services/{service_id}", \
controller=services_controller,
action="delete_service", conditions=dict(method=["DELETE"]))
mapper.connect("/services/{service_id}",
controller=services_controller,
action="get_service",

View File

@ -468,6 +468,10 @@ def get_global_tenant():
return 'GlobalTenant'
def get_test_service_id():
return 'exampleservice'
def handle_user_resp(self, content, respvalue, resptype):
if respvalue == 200:
if resptype == 'application/json':
@ -497,6 +501,55 @@ def create_role(roleid, auth_token):
return (resp, content)
def create_role_mapped_to_service(role_id, auth_token, service_id):
header = httplib2.Http(".cache")
url = '%sroles' % (URL_V2)
body = {"role": {"id": role_id,
"description": "A description ...",
"serviceId": service_id}}
resp, content = header.request(url, "POST", body=json.dumps(body),
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def create_role_mapped_to_service_xml(role_id, auth_token, service_id):
header = httplib2.Http(".cache")
url = '%sroles' % (URL_V2)
body = '<?xml version="1.0" encoding="UTF-8"?>\
<role xmlns="http://docs.openstack.org/identity/api/v2.0" \
id="%s" description="A Description of the role" serviceId="%s"/>\
' % (role_id, service_id)
resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token})
return (resp, content)
def get_role(role_id, auth_token):
header = httplib2.Http(".cache")
url = '%sroles/%s' % (URL_V2, role_id)
resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token,
"ACCEPT": "application/json",
})
return (resp, content)
def get_role_xml(role_id, auth_token):
header = httplib2.Http(".cache")
url = '%sroles/%s' % (URL_V2, role_id)
resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml",
})
return (resp, content)
def create_role_ref(user_id, role_id, tenant_id, auth_token):
header = httplib2.Http(".cache")
@ -685,5 +738,58 @@ def delete_all_endpoint(tenant_id, auth_token):
"X-Auth-Token": str(auth_token)})
def create_endpoint_template(region, service,
public_url, admin_url, internal_url, enabled, is_global, auth_token):
header = httplib2.Http(".cache")
url = '%sendpointTemplates' % (URL_V2)
body = {"endpointTemplate": {"region": region,
"serviceName": service,
"publicURL": public_url,
"adminURL": admin_url,
"internalURL": internal_url,
"enabled": enabled,
"global": is_global}}
resp, content = header.request(url, "POST", body=json.dumps(body),
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def create_endpoint_template_xml(region, service, public_url, admin_url,
internal_url, enabled, is_global, auth_token):
header = httplib2.Http(".cache")
url = '%sendpointTemplates' % (URL_V2)
body = '<?xml version="1.0" encoding="UTF-8"?>\
<endpointTemplate xmlns="http://docs.openstack.org/identity/api/v2.0" \
region="%s" serviceName="%s" \
publicURL="%s" adminURL="%s"\
internalURL="%s" enabled="%s"\
global="%s"/>' % (region, service, public_url,\
admin_url, internal_url, enabled, is_global)
body = {"endpointTemplate": {"region": region,
"serviceName": service,
"publicURL": public_url,
"adminURL": admin_url,
"internalURL": internal_url,
"enabled": enabled,
"global": is_global}}
resp, content = header.request(url, "POST", body=json.dumps(body),
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
return (resp, content)
def delete_endpoint_template(endpoint_template_id, auth_token):
header = httplib2.Http(".cache")
url = '%sendpointTemplates/%s' % (URL_V2, endpoint_template_id)
resp, content = header.request(url, "DELETE", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": str(auth_token)})
return resp, content
if __name__ == '__main__':
unittest.main()

View File

@ -53,6 +53,89 @@ class EndpointTemplatesTest(unittest.TestCase):
utils.delete_all_endpoint(self.tenant, self.auth_token)
class CreateEndpointTemplatesTest(EndpointTemplatesTest):
def test_create_endpoint_template(self):
region = 'DFW'
service = utils.get_test_service_id()
public_url = 'public'
admin_url = 'admin'
internal_url = 'internal'
enabled = True
is_global = False
resp, content = utils.create_endpoint_template(\
region, service, public_url,\
admin_url, internal_url, enabled, is_global, self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(201, int(resp['status']))
obj = json.loads(content)
if not "endpointTemplate" in obj:
raise fault.BadRequestFault("Expecting endpointTemplate")
endpoint_template = obj["endpointTemplate"]
if not "id" in endpoint_template:
endpoint_template_id = None
else:
endpoint_template_id = endpoint_template["id"]
if endpoint_template_id == None:
self.fail("Not the expected Endpoint Template")
if not "serviceName" in endpoint_template:
service_id = None
else:
service_id = endpoint_template["serviceName"]
if service_id != utils.get_test_service_id():
self.fail("Not the expected service")
resp, content = utils.delete_endpoint_template(
endpoint_template_id, self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(204, int(resp['status']))
def test_create_endpoint_template_xml(self):
region = 'DFW'
service = utils.get_test_service_id()
public_url = 'public'
admin_url = 'admin'
internal_url = 'internal'
enabled = True
is_global = False
resp, content = utils.create_endpoint_template_xml(
region, service, public_url, admin_url,
internal_url, enabled, is_global, self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(201, int(resp['status']))
#verify content
dom = etree.Element("root")
dom.append(etree.fromstring(content))
endpoint_template = dom.find(
"{http://docs.openstack.org/identity/api/v2.0}endpointTemplate")
if endpoint_template == None:
self.fail("Expecting endpointTemplates")
endpoint_template_id = endpoint_template.get("id")
if endpoint_template_id == None:
self.fail("Not the expected Endpoint template.")
service_id = endpoint_template.get("serviceName")
if service_id != utils.get_test_service_id():
self.fail("Not the expected service")
resp, content = utils.delete_endpoint_template(
endpoint_template_id, self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(204, int(resp['status']))
class GetEndpointTemplatesTest(EndpointTemplatesTest):
def test_get_endpoint_templates(self):
header = httplib2.Http(".cache")

View File

@ -46,6 +46,7 @@ class RolesTest(unittest.TestCase):
utils.create_user(self.tenant, self.user, self.auth_token)
self.token = utils.get_token(self.user, 'secrete', self.tenant,
'token')
self.service_id = utils.get_test_service_id()
def tearDown(self):
utils.delete_user(self.user, self.auth_token)
@ -67,6 +68,86 @@ class CreateRolesTest(RolesTest):
self.fail('Service Not Available')
self.assertEqual(204, int(resp['status']))
def test_create_role_mapped_to_a_service(self):
resp, content = utils.create_role_mapped_to_service(
'test_role', self.auth_token, self.service_id)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(201, int(resp['status']))
resp, content = utils.get_role('test_role', self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
resp, content = utils.get_role('test_role', self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
#verify content
obj = json.loads(content)
if not "role" in obj:
raise fault.BadRequestFault("Expecting Role")
role = obj["role"]
if not "id" in role:
role_id = None
else:
role_id = role["id"]
if role_id != 'test_role':
self.fail("Not the expected Role")
if not "serviceId" in role:
service_id = None
else:
service_id = role["serviceId"]
if service_id != self.service_id:
self.fail("Not the expected service")
resp, content = utils.delete_role('test_role', self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(204, int(resp['status']))
def test_create_role_mapped_to_a_service_xml(self):
resp, content = utils.create_role_mapped_to_service_xml(
'test_role', self.auth_token, self.service_id)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(201, int(resp['status']))
resp, content = utils.get_role_xml('test_role', self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
#verify content
dom = etree.Element("root")
dom.append(etree.fromstring(content))
role = dom.find("{http://docs.openstack.org/identity/api/v2.0}" \
"role")
if role == None:
self.fail("Expecting Role")
role_id = role.get("id")
if role_id != 'test_role':
self.fail("Not the expected Role")
service_id = role.get("serviceId")
if service_id != self.service_id:
self.fail("Not the expected service")
resp, content = utils.delete_role('test_role', self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(204, int(resp['status']))
class GetRolesTest(RolesTest):
def test_get_roles(self):

View File

@ -1,6 +1,6 @@
# You may need to install development files before using 'pip install'
# For example:
# sudo apt-get install python-dev libxml2-dev libxslt1-dev libsasl2d-ev libldap2-dev libsqlite3-dev
# sudo apt-get install python-dev libxml2-dev libxslt1-dev libsasl2-dev libldap2-dev libsqlite3-dev libssl-dev
# Production
eventlet
@ -27,4 +27,4 @@ coverage # computes code coverage percentages
# Testing
webtest
unittest2
pep8
pep8