Resolved conflicts

This commit is contained in:
Ziad Sawalha 2011-06-15 01:13:36 -05:00
commit 080f553755
7 changed files with 415 additions and 117 deletions

View File

@ -59,7 +59,7 @@
./keystone-manage $* token add 000999 admin 1234 2010-02-05T00:00
./keystone-manage $* token add 999888777 disabled 1234 2015-02-05T00:00
#Tenant Role
#Tenant base urls
./keystone-manage $* tenant_baseURL add 1234 1
./keystone-manage $* tenant_baseURL add 1234 2
./keystone-manage $* tenant_baseURL add 1234 3

View File

@ -847,14 +847,6 @@ def user_role_add(values):
return user_role_ref
def user_tenant_create(values):
#TODO(ZIAD): Update model / fix this
user_tenant_ref = models.UserTenantAssociation()
user_tenant_ref.update(values)
user_tenant_ref.save()
return user_tenant_ref
def user_get_update(id, session=None):
if not session:
session = get_session()
@ -862,6 +854,64 @@ def user_get_update(id, session=None):
return result
def users_get_page(marker, limit, session=None):
if not session:
session = get_session()
user = aliased(models.User)
if marker:
return session.query(user).\
filter("id>=:marker").params(
marker='%s' % marker).order_by(
"id").limit(limit).all()
else:
return session.query(user).\
order_by("id").limit(limit).all()
def users_get_page_markers(marker, limit,\
session=None):
if not session:
session = get_session()
user = aliased(models.User)
first = session.query(user).\
order_by(user.id).first()
last = session.query(user).\
order_by(user.id.desc()).first()
if first is None:
return (None, None)
if marker is None:
marker = first.id
next = session.query(user).\
filter("id > :marker").params(\
marker='%s' % marker).order_by(user.id).\
limit(int(limit)).all()
prev = session.query(user).\
filter("id < :marker").params(
marker='%s' % marker).order_by(
user.id.desc()).limit(int(limit)).all()
next_len = len(next)
prev_len = len(prev)
if next_len == 0:
next = last
else:
for t in next:
next = t
if prev_len == 0:
prev = first
else:
for t in prev:
prev = t
if first.id == marker:
prev = None
else:
prev = prev.id
if marker == last.id:
next = None
else:
next = next.id
return (prev, next)
def users_get_by_tenant_get_page(tenant_id, marker, limit, session=None):
if not session:
session = get_session()

View File

@ -432,6 +432,7 @@ class IdentityService(object):
dtenant = db_api.tenant_get(user.tenant_id)
if dtenant == None:
raise fault.UnauthorizedFault("Unauthorized")
if not dtenant.enabled:
raise fault.TenantDisabledFault("Your account has been disabled")
@ -487,6 +488,25 @@ class IdentityService(object):
(url, next, limit)))
return users.Users(ts, links)
def get_users(self, admin_token, marker, limit, url):
self.__validate_token(admin_token)
ts = []
dusers = db_api.users_get_page(marker, limit)
for duser in dusers:
ts.append(users.User(None, duser.id, duser.tenant_id,
duser.email, duser.enabled))
links = []
if ts.__len__():
prev, next = db_api.users_get_page_markers(marker, limit)
if prev:
links.append(atom.Link('prev', "%s?'marker=%s&limit=%s'" %
(url, prev, limit)))
if next:
links.append(atom.Link('next', "%s?'marker=%s&limit=%s'" %
(url, next, limit)))
return users.Users(ts, links)
def get_user(self, admin_token, user_id):
self.__validate_token(admin_token)
duser = db_api.user_get(user_id)
@ -498,9 +518,6 @@ class IdentityService(object):
dtenant = db_api.tenant_get(duser.tenant_id)
if dtenant != None and not dtenant.enabled:
raise fault.TenantDisabledFault("Your account has been disabled")
ts = []
dusergroups = db_api.user_groups_get_all(user_id)
@ -545,10 +562,6 @@ class IdentityService(object):
if not duser.enabled:
raise fault.UserDisabledFault("User has been disabled")
dtenant = db_api.tenant_get(user.tenant_id)
if dtenant != None and not dtenant.enabled:
raise fault.TenantDisabledFault("Your account has been disabled")
if not isinstance(user, users.User):
raise fault.BadRequestFault("Expecting a User")
@ -616,10 +629,18 @@ class IdentityService(object):
if duser == None:
raise fault.ItemNotFoundFault("The user could not be found")
dtenant = db_api.tenant_get(user.tenant_id)
#Check if tenant exists.If user has passed a tenant that does not exist throw error.
#If user is trying to update to a tenant that is disabled throw an error.
if dtenant == None and len(user.tenant_id) > 0:
raise fault.ItemNotFoundFault("The tenant not found")
elif not dtenant.enabled:
raise fault.TenantDisabledFault("Your account has been disabled")
values = {'tenant_id': user.tenant_id}
db_api.user_update(user_id, values)
return users.User_Update(None, None, user.tenant_id, None, None, None)
def delete_user(self, admin_token, user_id):
@ -629,25 +650,12 @@ class IdentityService(object):
raise fault.ItemNotFoundFault("The user could not be found")
dtenant = db_api.tenant_get(duser.tenant_id)
if dtenant != None and not dtenant.enabled:
raise fault.TenantDisabledFault("Your account has been disabled")
db_api.user_delete_tenant(user_id, dtenant.id)
return None
def get_user_groups(self, admin_token, tenant_id, user_id, marker, limit,
def get_user_groups(self, admin_token, user_id, marker, limit,
url):
self.__validate_token(admin_token)
if tenant_id == None:
raise fault.BadRequestFault("Expecting a Tenant Id")
dtenant = db_api.tenant_get(tenant_id)
if dtenant == None:
raise fault.ItemNotFoundFault("The tenant not found")
if not dtenant.enabled:
raise fault.TenantDisabledFault("Your account has been disabled")
ts = []
dusergroups = db_api.groups_get_by_user_get_page(user_id, marker,
limit)

2
keystone/logic/types/user.py Normal file → Executable file
View File

@ -226,7 +226,7 @@ class User_Update(object):
if self.user_id:
user["id"] = self.user_id
if self.user_id:
if self.tenant_id:
user["tenantId"] = self.tenant_id
if self.password:
user["password"] = self.password

View File

@ -325,7 +325,7 @@ class UserController(wsgi.Controller):
@utils.wrap_error
def update_user_tenant(self, req, user_id):
user = utils.get_normalized_request_content(users.User_Update, req)
service.set_user_tenant(utils.get_auth_token(req), user_id,
rval = service.set_user_tenant(utils.get_auth_token(req), user_id,
user)
return utils.send_result(200, req, rval)
@ -337,10 +337,10 @@ class UserController(wsgi.Controller):
return utils.send_result(200, req, users)
@utils.wrap_error
def get_user_groups(self, req, tenant_id, user_id):
def get_user_groups(self, req, user_id):
marker, limit, url = get_marker_limit_and_url(req)
groups = service.get_user_groups(utils.get_auth_token(req),
tenant_id, user_id, marker, limit, url)
user_id, marker, limit, url)
return utils.send_result(200, req, groups)
@ -681,7 +681,7 @@ class KeystoneAdminAPI(wsgi.Router):
controller=user_controller,
action="set_user_password",
conditions=dict(method=["PUT"]))
mapper.connect("/v2.0/{tenant_id}/users/{user_id}",
mapper.connect("/v2.0/users/{user_id}/tenant",
controller=user_controller,
action="update_user_tenant",
conditions=dict(method=["PUT"]))
@ -690,16 +690,16 @@ class KeystoneAdminAPI(wsgi.Router):
controller=user_controller,
action="set_user_enabled",
conditions=dict(method=["PUT"]))
mapper.connect("/v2.0/users/{user_id}/groups",
controller=user_controller,
action="get_user_groups",
conditions=dict(method=["GET"]))
mapper.connect("/v2.0/tenants/{tenant_id}/users",
controller=user_controller,
action="get_tenant_users",
conditions=dict(method=["GET"]))
mapper.connect("/v2.0/tenants/{tenant_id}/users/{user_id}/groups",
controller=user_controller,
action="get_user_groups",
conditions=dict(method=["GET"]))
#Global Groups
groups_controller = GroupsController(options)

View File

@ -430,6 +430,28 @@ def user_enabled_xml(user_id, auth_token):
"ACCEPT": "application/xml"})
return (resp, content)
def user_tenant_update_json(user_id, tenant_id, auth_token):
h = httplib2.Http(".cache")
url = '%susers/%s/tenant' % (URL, user_id)
data = {"user": {"tenantId": tenant_id}}
resp, content = h.request(url, "PUT", body=json.dumps(data),
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def user_tenant_update_xml(user_id, tenant_id, auth_token):
h = httplib2.Http(".cache")
url = '%susers/%s/tenant' % (URL, user_id)
data = '<?xml version="1.0" encoding="UTF-8"?> \
<user xmlns="http://docs.openstack.org/identity/api/v2.0" \
tenantId="%s" />' % (tenant_id)
resp, content = h.request(url, "PUT", body=data,
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
return (resp, content)
def user_get_xml(user_id, auth_token):
h = httplib2.Http(".cache")
@ -450,6 +472,26 @@ def users_get_json(user_id, auth_token):
return (resp, content)
def users_get_all_xml(auth_token):
h = httplib2.Http(".cache")
url = '%susers' % (URL)
resp, content = h.request(url, "GET", body='{}',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
return (resp, content)
def users_get_all_json(auth_token):
h = httplib2.Http(".cache")
url = '%susers' % (URL)
resp, content = h.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def users_get_xml(tenant_id, auth_token):
h = httplib2.Http(".cache")
url = '%susers/%s' % (URL, tenant_id)
@ -460,18 +502,18 @@ def users_get_xml(tenant_id, auth_token):
return (resp, content)
def users_group_get_json(tenant_id, user_id, auth_token):
def users_group_get_json(user_id, auth_token):
h = httplib2.Http(".cache")
url = '%stenants/%s/users/%s/groups' % (URL, tenant_id, user_id)
url = '%susers/%s/groups' % (URL, user_id)
resp, content = h.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def users_group_get_xml(tenant_id, user_id, auth_token):
def users_group_get_xml(user_id, auth_token):
h = httplib2.Http(".cache")
url = '%stenants/%s/users/%s/groups' % (URL, tenant_id, user_id)
url = '%susers/%s/groups' % (URL, user_id)
resp, content = h.request(url, "GET", body='{}',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,

View File

@ -472,6 +472,104 @@ class DeleteUserTest(UserTest):
self.assertEqual('application/xml', utils.content_type(resp))
'''
class GetAllUsersTest(UserTest):
def test_users_get(self):
resp, content = utils.users_get_all_json(self.auth_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(200, resp_val)
def test_users_get_xml(self):
resp, content = utils.users_get_all_xml(self.auth_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(200, resp_val)
self.assertEqual('application/xml', utils.content_type(resp))
def test_users_get_expired_token(self):
resp, content = utils.users_get_all_json(self.exp_auth_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(403, resp_val)
def test_users_get_expired_token_xml(self):
resp, content = utils.users_get_all_xml(self.exp_auth_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(403, resp_val)
self.assertEqual('application/xml', utils.content_type(resp))
def test_users_get_disabled_token(self):
resp, content = utils.users_get_all_json(self.disabled_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(403, resp_val)
def test_users_get_disabled_token_xml(self):
resp, content = utils.users_get_all_xml(self.disabled_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(403, resp_val)
self.assertEqual('application/xml', utils.content_type(resp))
def test_users_get_missing_token(self):
resp, content = utils.users_get_all_json(self.missing_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(401, resp_val)
def test_users_get_missing_token_xml(self):
resp, content = utils.users_get_all_xml(self.missing_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(401, resp_val)
self.assertEqual('application/xml', utils.content_type(resp))
def test_users_get_invalid_token(self):
resp, content = utils.users_get_all_json(self.invalid_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(404, resp_val)
def test_users_get_invalid_token_xml(self):
resp, content = utils.users_get_all_xml(self.invalid_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(404, resp_val)
self.assertEqual('application/xml', utils.content_type(resp))
class GetUsersTest(UserTest):
def test_users_get(self):
@ -597,8 +695,7 @@ class GetUsersTest(UserTest):
class GetUsersGroupTest(UserTest):
def test_users_group_get(self):
resp, content = utils.users_group_get_json(self.tenant,
self.user,
resp, content = utils.users_group_get_json(self.user,
self.auth_token)
resp_val = int(resp['status'])
if resp_val == 500:
@ -608,8 +705,7 @@ class GetUsersGroupTest(UserTest):
self.assertEqual(200, resp_val)
def test_users_group_get_xml(self):
resp, content = utils.users_group_get_xml(self.tenant,
self.user,
resp, content = utils.users_group_get_xml(self.user,
self.auth_token)
resp_val = int(resp['status'])
if resp_val == 500:
@ -620,8 +716,7 @@ class GetUsersGroupTest(UserTest):
self.assertEqual('application/xml', utils.content_type(resp))
def test_users_group_get_expired_token(self):
resp, content = utils.users_group_get_json(self.tenant,
self.user,
resp, content = utils.users_group_get_json(self.user,
self.exp_auth_token)
resp_val = int(resp['status'])
if resp_val == 500:
@ -631,8 +726,7 @@ class GetUsersGroupTest(UserTest):
self.assertEqual(403, resp_val)
def test_users_group_get_expired_token_xml(self):
resp, content = utils.users_group_get_xml(self.tenant,
self.user,
resp, content = utils.users_group_get_xml(self.user,
self.exp_auth_token)
resp_val = int(resp['status'])
if resp_val == 500:
@ -643,8 +737,7 @@ class GetUsersGroupTest(UserTest):
self.assertEqual('application/xml', utils.content_type(resp))
def test_users_group_get_disabled_token(self):
resp, content = utils.users_group_get_json(self.tenant,
self.user,
resp, content = utils.users_group_get_json(self.user,
self.disabled_token)
resp_val = int(resp['status'])
if resp_val == 500:
@ -654,8 +747,7 @@ class GetUsersGroupTest(UserTest):
self.assertEqual(403, resp_val)
def test_users_group_get_disabled_token_xml(self):
resp, content = utils.users_group_get_xml(self.tenant,
self.user,
resp, content = utils.users_group_get_xml(self.user,
self.disabled_token)
resp_val = int(resp['status'])
if resp_val == 500:
@ -666,8 +758,7 @@ class GetUsersGroupTest(UserTest):
self.assertEqual('application/xml', utils.content_type(resp))
def test_users_group_get_missing_token(self):
resp, content = utils.users_group_get_json(self.tenant,
self.user,
resp, content = utils.users_group_get_json(self.user,
self.missing_token)
resp_val = int(resp['status'])
if resp_val == 500:
@ -677,8 +768,7 @@ class GetUsersGroupTest(UserTest):
self.assertEqual(401, resp_val)
def test_users_group_get_missing_token_xml(self):
resp, content = utils.users_group_get_xml(self.tenant,
self.user,
resp, content = utils.users_group_get_xml(self.user,
self.missing_token)
resp_val = int(resp['status'])
if resp_val == 500:
@ -689,8 +779,7 @@ class GetUsersGroupTest(UserTest):
self.assertEqual('application/xml', utils.content_type(resp))
def test_users_group_get_invalid_token(self):
resp, content = utils.users_group_get_json(self.tenant,
self.user,
resp, content = utils.users_group_get_json(self.user,
self.invalid_token)
resp_val = int(resp['status'])
if resp_val == 500:
@ -700,8 +789,7 @@ class GetUsersGroupTest(UserTest):
self.assertEqual(404, resp_val)
def test_users_group_get_invalid_token_xml(self):
resp, content = utils.users_group_get_xml(self.tenant,
self.user,
resp, content = utils.users_group_get_xml(self.user,
self.invalid_token)
resp_val = int(resp['status'])
if resp_val == 500:
@ -711,30 +799,6 @@ class GetUsersGroupTest(UserTest):
self.assertEqual(404, resp_val)
self.assertEqual('application/xml', utils.content_type(resp))
def test_users_group_get_disabled_tenant(self):
resp, content = utils.users_group_get_json('0000',
self.user,
self.auth_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(403, resp_val)
def test_users_group_get_disabled_tenant_xml(self):
resp, content = utils.users_group_get_xml('0000',
self.user,
self.auth_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(403, resp_val)
self.assertEqual('application/xml', utils.content_type(resp))
class UpdateUserTest(UserTest):
def test_user_update(self):
@ -1225,33 +1289,6 @@ class SetEnabledTest(UserTest):
utils.delete_user(self.user, str(self.auth_token))
self.assertEqual(400, resp_val)
self.assertEqual('application/xml', utils.content_type(resp))
'''
TODO: Right now the very first call to create a user fails.This prevents from executing test.Need to find a way.
def test_user_enabled_disabled_tenant(self):
utils.create_user(self.tenant, self.user, str(self.auth_token))
resp, content = utils.user_enabled_json(self.user,
str(self.auth_token))
resp_val = int(resp['status'])
content = json.loads(content)
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(403, resp_val)
def test_user_enabled_disabled_tenant_xml(self):
utils.create_user(self.tenant, self.user, str(self.auth_token))
resp, content = utils.user_enabled_xml(self.user,
str(self.auth_token))
resp_val = int(resp['status'])
content = etree.fromstring(content)
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(403, resp_val)
self.assertEqual('application/xml', utils.content_type(resp))
'''
def test_user_enabled_expired_token(self):
utils.create_user(self.tenant, self.user, str(self.auth_token))
@ -1353,6 +1390,167 @@ class SetEnabledTest(UserTest):
self.assertEqual(401, resp_val)
self.assertEqual('application/xml', utils.content_type(resp))
class TenantUpdateTest(UserTest):
def test_update_user_tenant(self):
utils.create_user(self.tenant, self.user, str(self.auth_token))
resp, content = utils.user_tenant_update_json(self.user, utils.get_another_tenant(),
str(self.auth_token))
resp_val = int(resp['status'])
content = json.loads(content)
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
utils.delete_user(self.user, str(self.auth_token))
self.assertEqual(200, resp_val)
self.assertEqual(utils.get_another_tenant(), content['user']['tenantId'])
def test_update_user_tenant_xml(self):
utils.create_user(self.tenant, self.user, str(self.auth_token))
resp, content = utils.user_tenant_update_xml(self.user, utils.get_another_tenant(),
str(self.auth_token))
resp_val = int(resp['status'])
content = etree.fromstring(content)
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
utils.delete_user(self.user, str(self.auth_token))
self.assertEqual(200, resp_val)
self.assertEqual(utils.get_another_tenant(), content.get("tenantId"))
self.assertEqual('application/xml', utils.content_type(resp))
def test_update_user_tenant_using_invalid_tenant(self):
utils.create_user(self.tenant, self.user, str(self.auth_token))
resp, content = utils.user_tenant_update_json(self.user, "unknown",
str(self.auth_token))
resp_val = int(resp['status'])
content = json.loads(content)
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(404, resp_val)
utils.delete_user(self.user, str(self.auth_token))
def test_update_user_tenant_using_disabled_tenant(self):
utils.create_user(self.tenant, self.user, str(self.auth_token))
resp, content = utils.user_tenant_update_json(self.user, "disable",
str(self.auth_token))
resp_val = int(resp['status'])
content = json.loads(content)
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(404, resp_val)
utils.delete_user(self.user, str(self.auth_token))
def test_update_user_tenant_using_missing_token(self):
utils.create_user(self.tenant, self.user, str(self.auth_token))
resp, content = utils.user_tenant_update_json(self.user, utils.get_another_tenant(),
str(self.missing_token))
resp_val = int(resp['status'])
content = json.loads(content)
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
utils.delete_user(self.user, str(self.auth_token))
self.assertEqual(401, resp_val)
def test_update_user_tenant_using_invalid_token(self):
utils.create_user(self.tenant, self.user, str(self.auth_token))
resp, content = utils.user_tenant_update_json(self.user, utils.get_another_tenant(),
str(self.invalid_token))
resp_val = int(resp['status'])
content = json.loads(content)
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
utils.delete_user(self.user, str(self.auth_token))
self.assertEqual(404, resp_val)
def test_update_user_tenant_using_disabled_token(self):
utils.create_user(self.tenant, self.user, str(self.auth_token))
resp, content = utils.user_tenant_update_json(self.user, utils.get_another_tenant(),
str(self.disabled_token))
resp_val = int(resp['status'])
content = json.loads(content)
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
utils.delete_user(self.user, str(self.auth_token))
self.assertEqual(403, resp_val)
def test_update_user_tenant_using_exp_auth_token(self):
utils.create_user(self.tenant, self.user, str(self.auth_token))
resp, content = utils.user_tenant_update_json(self.user, utils.get_another_tenant(),
str(self.exp_auth_token))
resp_val = int(resp['status'])
content = json.loads(content)
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
utils.delete_user(self.user, str(self.auth_token))
self.assertEqual(403, resp_val)
def test_update_user_tenant_xml_using_missing_token(self):
utils.create_user(self.tenant, self.user, str(self.auth_token))
resp, content = utils.user_tenant_update_xml(self.user, utils.get_another_tenant(),
str(self.missing_token))
resp_val = int(resp['status'])
content = etree.fromstring(content)
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
utils.delete_user(self.user, str(self.auth_token))
self.assertEqual(401, resp_val)
def test_update_user_tenant_xml_using_invalid_token(self):
utils.create_user(self.tenant, self.user, str(self.auth_token))
resp, content = utils.user_tenant_update_xml(self.user, utils.get_another_tenant(),
str(self.invalid_token))
resp_val = int(resp['status'])
content = etree.fromstring(content)
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
utils.delete_user(self.user, str(self.auth_token))
self.assertEqual(404, resp_val)
def test_update_user_tenant_xml_using_disabled_token(self):
utils.create_user(self.tenant, self.user, str(self.auth_token))
resp, content = utils.user_tenant_update_xml(self.user, utils.get_another_tenant(),
str(self.disabled_token))
resp_val = int(resp['status'])
content = etree.fromstring(content)
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
utils.delete_user(self.user, str(self.auth_token))
self.assertEqual(403, resp_val)
def test_update_user_tenant_xml_using_exp_auth_token(self):
utils.create_user(self.tenant, self.user, str(self.auth_token))
resp, content = utils.user_tenant_update_xml(self.user, utils.get_another_tenant(),
str(self.exp_auth_token))
resp_val = int(resp['status'])
content = etree.fromstring(content)
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
utils.delete_user(self.user, str(self.auth_token))
self.assertEqual(403, resp_val)
class AddUserTest(UserTest):