#16 Changes to remove unused group clls.

Change-Id: Ia5a6f74ccfa5fb9975950c63db7828441a8ba4a0

Issue #16 Pylint changes.Changes to remove unused group calls.

Change-Id: Ia5a6f74ccfa5fb9975950c63db7828441a8ba4a0
This commit is contained in:
Yogeshwar Srikrishnan 2011-07-27 18:28:53 -05:00
parent 637f3692dc
commit 979bc77e7c
31 changed files with 22 additions and 3857 deletions

View File

@ -57,17 +57,6 @@
#Global endpointTemplate
`dirname $0`/keystone-manage $* endpointTemplates add RegionOne identity http://keystone.publicinternets.com/v2.0 http://127.0.0.1:5001/v2.0 http://127.0.0.1:5000/v2.0 1 1
# Groups
#`dirname $0`/keystone-manage $* group add Admin 1234
#`dirname $0`/keystone-manage $* group add Default 1234
#`dirname $0`/keystone-manage $* group add Empty 0000
# User Group Associations
#`dirname $0`/keystone-manage $* user joeuser join Default
#`dirname $0`/keystone-manage $* user disabled join Default
#`dirname $0`/keystone-manage $* user admin join Admin
# Tokens
`dirname $0`/keystone-manage $* token add 887665443383838 joeuser 1234 2012-02-05T00:00
`dirname $0`/keystone-manage $* token add 999888777666 admin 1234 2015-02-05T00:00

View File

@ -47,8 +47,8 @@ keystone-admin-role = Admin
# server. Any valid SQLAlchemy connection string is fine.
# See: http://bit.ly/ideIpI
sql_connection = sqlite:///keystone.db
backend_entities = ['UserGroupAssociation', 'UserRoleAssociation', 'Endpoints',
'Role', 'Tenant', 'User', 'Group', 'Credentials', 'EndpointTemplates']
backend_entities = ['UserRoleAssociation', 'Endpoints',
'Role', 'Tenant', 'User', 'Credentials', 'EndpointTemplates']
# Period in seconds after which SQLAlchemy should reestablish its connection
# to the database.
@ -69,7 +69,7 @@ sql_idle_timeout = 30
ldap_url = fake://ldap.db
ldap_user = cn=Admin
ldap_password = password
backend_entities = ['Tenant', 'User', 'Group']
backend_entities = ['Tenant', 'User']
[keystone.backends.memcache]
memcache_hosts = 127.0.0.1:11211

View File

@ -74,8 +74,6 @@ class EchoApp(object):
print ' Identity :', self.envr['HTTP_X_AUTHORIZATION']
if 'HTTP_X_TENANT' in self.envr:
print ' Tenant :', self.envr['HTTP_X_TENANT']
if 'HTTP_X_GROUP' in self.envr:
print ' Group :', self.envr['HTTP_X_GROUP']
if 'HTTP_X_ROLE' in self.envr:
print ' Roles :', self.envr['HTTP_X_ROLE']

View File

@ -92,6 +92,4 @@ class UserAPI(BaseLdapAPI, BaseUserAPI):
return self._get_page_markers(marker, limit,
self.api.tenant.get_users(tenant_id))
add_redirects(locals(), SQLUserAPI, ['get_by_group', 'tenant_group',
'tenant_group_delete', 'user_groups_get_all',
'users_tenant_group_get_page', 'users_tenant_group_get_page_markers'])
add_redirects(locals(), SQLUserAPI, [])

View File

@ -18,9 +18,6 @@
import ast
import logging
from sqlalchemy import create_engine
from sqlalchemy.orm import joinedload, aliased, sessionmaker
from keystone.common import config
from keystone.backends.alterdb import models
import keystone.utils as utils

View File

@ -15,7 +15,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from keystone.backends.memcache import MEMCACHE_SERVER, models
from keystone.backends.api import BaseTokenAPI

View File

@ -15,24 +15,19 @@
# limitations under the License.
#Current Models
UserGroupAssociation = None
UserRoleAssociation = None
Endpoints = None
Role = None
Tenant = None
User = None
Credentials = None
Group = None
Token = None
EndpointTemplates = None
# Function to dynamically set model references.
def set_value(variable_name, value):
if variable_name == 'UserGroupAssociation':
global UserGroupAssociation
UserGroupAssociation = value
elif variable_name == 'UserRoleAssociation':
if variable_name == 'UserRoleAssociation':
global UserRoleAssociation
UserRoleAssociation = value
elif variable_name == 'Endpoints':
@ -50,9 +45,6 @@ def set_value(variable_name, value):
elif variable_name == 'Credentials':
global Credentials
Credentials = value
elif variable_name == 'Group':
global Group
Group = value
elif variable_name == 'Token':
global Token
Token = value

View File

@ -1 +1 @@
import endpoint_template, group, role, tenant_group, tenant, token, user
import endpoint_template, role, tenant, token, user

View File

@ -1,176 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone.backends.sqlalchemy import get_session, models, aliased
from keystone.backends.api import BaseGroupAPI
class GroupAPI(BaseGroupAPI):
def get(self, id, session=None):
if not session:
session = get_session()
result = session.query(models.Group).filter_by(id=id).first()
return result
def get_users(self, id, session=None):
if not session:
session = get_session()
result = session.query(models.User).filter_by(\
group_id=id)
return result
def get_all(self, session=None):
if not session:
session = get_session()
result = session.query(models.Group)
return result
def get_page(self, marker, limit, session=None):
if not session:
session = get_session()
if marker:
return session.query(models.Group).filter("id>:marker").params(\
marker='%s' % marker).order_by(\
models.Group.id.desc()).limit(limit).all()
else:
return session.query(models.Group).order_by(\
models.Group.id.desc()).limit(limit).all()
def get_page_markers(self, marker, limit, session=None):
if not session:
session = get_session()
first = session.query(models.Group).order_by(\
models.Group.id).first()
last = session.query(models.Group).order_by(\
models.Group.id.desc()).first()
if first is None:
return (None, None)
if marker is None:
marker = first.id
next = session.query(models.Group).filter("id > :marker").params(\
marker='%s' % marker).order_by(\
models.Group.id).limit(limit).all()
prev = session.query(models.Group).filter("id < :marker").params(\
marker='%s' % marker).order_by(\
models.Group.id.desc()).limit(int(limit)).all()
if len(next) == 0:
next = last
else:
for t in next:
next = t
if len(prev) == 0:
prev = first
else:
for t in prev:
prev = t
if prev.id == marker:
prev = None
else:
prev = prev.id
if next.id == last.id:
next = None
else:
next = next.id
return (prev, next)
def delete(self, id, session=None):
if not session:
session = get_session()
with session.begin():
group_ref = self.get(id, session)
session.delete(group_ref)
def get_by_user_get_page(self, user_id, marker, limit, session=None):
if not session:
session = get_session()
uga = aliased(models.UserGroupAssociation)
group = aliased(models.Group)
if marker:
return session.query(group, uga).join(\
(uga, uga.group_id == group.id)).\
filter(uga.user_id == user_id).\
filter("id>=:marker").params(
marker='%s' % marker).order_by(
group.id).limit(limit).all()
else:
return session.query(group, uga).\
join((uga, uga.group_id == group.id)).\
filter(uga.user_id == user_id).order_by(
group.id).limit(limit).all()
def get_by_user_get_page_markers(self, user_id, marker, limit, session=None):
if not session:
session = get_session()
uga = aliased(models.UserGroupAssociation)
group = aliased(models.Group)
first, _firstassoc = session.query(group, uga).\
join((uga, uga.group_id == group.id)).\
filter(uga.user_id == user_id).\
order_by(group.id).first()
last, _lastassoc = session.query(group, uga).\
join((uga, uga.group_id == group.id)).\
filter(uga.user_id == user_id).\
order_by(group.id.desc()).first()
if first is None:
return (None, None)
if marker is None:
marker = first.id
next = session.query(group, uga).join(
(uga, uga.group_id == group.id)).\
filter(uga.user_id == user_id).\
filter("id>=:marker").params(
marker='%s' % marker).order_by(
group.id).limit(int(limit)).all()
prev = session.query(group, uga).join(
(uga, uga.group_id == group.id)).\
filter(uga.user_id == user_id).\
filter("id < :marker").params(
marker='%s' % marker).order_by(
group.id).limit(int(limit) + 1).all()
next_len = len(next)
prev_len = len(prev)
if next_len == 0:
next = last
else:
for t, _a in next:
next = t
if prev_len == 0:
prev = first
else:
for t, _a in prev:
prev = t
if first.id == marker:
prev = None
else:
prev = prev.id
if marker == last.id:
next = None
else:
next = next.id
return (prev, next)
def get():
return GroupAPI()

View File

@ -157,9 +157,6 @@ class TenantAPI(BaseTenantAPI):
tenant_id=id).first()
if a_user != None:
return False
a_group = session.query(models.Group).filter_by(tenant_id=id).first()
if a_group != None:
return False
a_user = session.query(models.User).filter_by(tenant_id=id).first()
if a_user != None:
return False

View File

@ -1,123 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone.backends.sqlalchemy import get_session, models
from keystone.backends.api import BaseTenantGroupAPI
class TenantGroupAPI(BaseTenantGroupAPI):
def create(self, values):
group_ref = models.Group()
group_ref.update(values)
group_ref.save()
return group_ref
def is_empty(self, id, session=None):
if not session:
session = get_session()
a_user = session.query(models.UserGroupAssociation).filter_by(
group_id=id).first()
if a_user != None:
return False
return True
def get(self, id, tenant, session=None):
if not session:
session = get_session()
result = session.query(models.Group).filter_by(id=id, \
tenant_id=tenant).first()
return result
def get_page(self, tenantId, marker, limit, session=None):
if not session:
session = get_session()
if marker:
return session.query(models.Group).filter("id>:marker").params(\
marker='%s' % marker).filter_by(\
tenant_id=tenantId).order_by(\
models.Group.id.desc()).limit(limit).all()
else:
return session.query(models.Group).filter_by(tenant_id=tenantId)\
.order_by(models.Group.id.desc()).limit(limit).all()
#return session.query(models.Tenant).all()
def get_page_markers(self, tenantId, marker, limit, session=None):
if not session:
session = get_session()
first = session.query(models.Group).filter_by(\
tenant_id=tenantId).order_by(\
models.Group.id).first()
last = session.query(models.Group).filter_by(\
tenant_id=tenantId).order_by(\
models.Group.id.desc()).first()
if first is None:
return (None, None)
if marker is None:
marker = first.id
next = session.query(models.Group).filter("id > :marker").params(\
marker='%s' % marker).filter_by(\
tenant_id=tenantId).order_by(\
models.Group.id).limit(limit).all()
prev = session.query(models.Group).filter("id < :marker").params(\
marker='%s' % marker).filter_by(\
tenant_id=tenantId).order_by(\
models.Group.id.desc()).limit(int(limit)).all()
if len(next) == 0:
next = last
else:
for t in next:
next = t
if len(prev) == 0:
prev = first
else:
for t in prev:
prev = t
if prev.id == marker:
prev = None
else:
prev = prev.id
if next.id == last.id:
next = None
else:
next = next.id
return (prev, next)
def update(self, id, tenant_id, values, session=None):
if not session:
session = get_session()
with session.begin():
tenant_ref = self.get(id, tenant_id, session)
tenant_ref.update(values)
tenant_ref.save(session=session)
def delete(self, id, tenant_id, session=None):
if not session:
session = get_session()
with session.begin():
tenantgroup_ref = self.get(id, tenant_id, session)
session.delete(tenantgroup_ref)
def get():
return TenantGroupAPI()

View File

@ -26,31 +26,6 @@ class UserAPI(BaseUserAPI):
result = session.query(models.User)
return result
def get_by_group(self, user_id, group_id, session=None):
if not session:
session = get_session()
result = session.query(models.UserGroupAssociation).filter_by(\
group_id=group_id, user_id=user_id).first()
return result
def tenant_group(self, values):
user_ref = models.UserGroupAssociation()
user_ref.update(values)
user_ref.save()
return user_ref
def tenant_group_delete(self, id, group_id, session=None):
if not session:
session = get_session()
with session.begin():
usertenantgroup_ref = self.get_by_group(id, group_id, session)
if usertenantgroup_ref is not None:
session.delete(usertenantgroup_ref)
def create(self, values):
user_ref = models.User()
self.__check_and_use_hashed_password(values)
@ -67,9 +42,6 @@ class UserAPI(BaseUserAPI):
def get(self, id, session=None):
if not session:
session = get_session()
#TODO(Ziad): finish cleaning up model
# result = session.query(models.User).options(joinedload('groups')).\
# options(joinedload('tenants')).filter_by(id=id).first()
result = session.query(models.User).filter_by(id=id).first()
return result
@ -132,14 +104,6 @@ class UserAPI(BaseUserAPI):
return result
def get_groups(self, id, session=None):
if not session:
session = get_session()
result = session.query(models.Group).filter_by(\
user_id=id)
return result
def user_roles_by_tenant(self, user_id, tenant_id, session=None):
if not session:
session = get_session()
@ -157,72 +121,6 @@ class UserAPI(BaseUserAPI):
user_ref.update(values)
user_ref.save(session=session)
def users_tenant_group_get_page(self, group_id, marker, limit, session=None):
if not session:
session = get_session()
uga = aliased(models.UserGroupAssociation)
user = aliased(models.User)
if marker:
return session.query(user, uga).join(\
(uga, uga.user_id == user.id)).\
filter(uga.group_id == group_id).\
filter("id>=:marker").params(\
marker='%s' % marker).order_by(\
user.id).limit(limit).all()
else:
return session.query(user, uga).\
join((uga, uga.user_id == user.id)).\
filter(uga.group_id == group_id).order_by(\
user.id).limit(limit).all()
def users_tenant_group_get_page_markers(self, group_id, marker, limit, session=None):
if not session:
session = get_session()
uga = aliased(models.UserGroupAssociation)
user = aliased(models.User)
first = session.query(models.User).order_by(\
models.User.id).first()
last = session.query(models.User).order_by(\
models.User.id.desc()).first()
if first is None:
return (None, None)
if marker is None:
marker = first.id
next_page = session.query(user).join(
(uga, uga.user_id == user.id)).\
filter(uga.group_id == group_id).\
filter("id > :marker").params(\
marker='%s' % marker).order_by(\
user.id).limit(limit).all()
prev_page = session.query(user).join(\
(uga, uga.user_id == user.id)).\
filter(uga.group_id == group_id).\
filter("id < :marker").params(\
marker='%s' % marker).order_by(\
user.id.desc()).limit(int(limit)).all()
if len(next_page) == 0:
next_page = last
else:
for t in next_page:
next_page = t
if len(prev_page) == 0:
prev_page = first
else:
for t in prev_page:
prev_page = t
if prev_page.id == marker:
prev_page = None
else:
prev_page = prev_page.id
if next_page.id == last.id:
next_page = None
else:
next_page = next_page.id
return (prev_page, next_page)
def delete(self, id, session=None):
if not session:
session = get_session()
@ -248,14 +146,6 @@ class UserAPI(BaseUserAPI):
else:
return None
def get_group_by_tenant(self, id, session=None):
if not session:
session = get_session()
user_group = session.query(models.Group).filter_by(tenant_id=id).all()
return user_group
def delete_tenant_user(self, id, tenant_id, session=None):
if not session:
session = get_session()
@ -265,17 +155,6 @@ class UserAPI(BaseUserAPI):
for user_tenant_ref in users_tenant_ref:
session.delete(user_tenant_ref)
user_group_ref = self.get_group_by_tenant(tenant_id, session)
if user_group_ref is not None:
for user_group in user_group_ref:
get_users = session.query(models.UserGroupAssociation)\
.filter_by(user_id=id,
group_id=user_group.id).all()
for group_user in get_users:
session.delete(group_user)
def users_get_by_tenant(self, user_id, tenant_id, session=None):
if not session:
session = get_session()
@ -433,15 +312,5 @@ class UserAPI(BaseUserAPI):
next_page = next_page.id
return (prev_page, next_page)
def user_groups_get_all(self, user_id, session=None):
if not session:
session = get_session()
uga = aliased(models.UserGroupAssociation)
group = aliased(models.Group)
return session.query(group, uga).\
join((uga, uga.group_id == group.id)).\
filter(uga.user_id == user_id).order_by(
group.id).all()
def get():
return UserAPI()

View File

@ -79,13 +79,6 @@ class KeystoneBase(object):
# Define associations first
class UserGroupAssociation(Base, KeystoneBase):
__tablename__ = 'user_group_association'
__api__ = 'tenant_group'
user_id = Column(String(255), ForeignKey('users.id'), primary_key=True)
group_id = Column(String(255), ForeignKey('groups.id'), primary_key=True)
class UserRoleAssociation(Base, KeystoneBase):
__tablename__ = 'user_roles'
id = Column(Integer, primary_key=True)
@ -119,7 +112,6 @@ class Tenant(Base, KeystoneBase):
id = Column(String(255), primary_key=True, unique=True)
desc = Column(String(255))
enabled = Column(Integer)
groups = relationship('Group', backref='tenants')
endpoints = relationship('Endpoints', backref='tenant', cascade="all")
@ -131,8 +123,6 @@ class User(Base, KeystoneBase):
email = Column(String(255))
enabled = Column(Integer)
tenant_id = Column(String(255), ForeignKey('tenants.id'))
groups = relationship(UserGroupAssociation, backref='users')
roles = relationship(UserRoleAssociation, cascade="all")
@ -145,13 +135,6 @@ class Credentials(Base, KeystoneBase):
secret = Column(String(255))
class Group(Base, KeystoneBase):
__tablename__ = 'groups'
__api__ = 'group'
id = Column(String(255), primary_key=True, unique=True)
desc = Column(String(255))
tenant_id = Column(String(255), ForeignKey('tenants.id'))
class Token(Base, KeystoneBase):
__tablename__ = 'token'
__api__ = 'token'

View File

@ -1,66 +0,0 @@
from keystone import utils
from keystone.common import wsgi
import keystone.config as config
from keystone.logic.types.tenant import GlobalGroup
from . import get_marker_limit_and_url
class GroupsController(wsgi.Controller):
"""Controller for Group related operations"""
def __init__(self, options):
self.options = options
@utils.wrap_error
def create_group(self, req):
group = utils.get_normalized_request_content(GlobalGroup, req)
return utils.send_result(201, req,
config.SERVICE.create_global_group(utils.get_auth_token(req),
group))
@utils.wrap_error
def get_groups(self, req):
marker, limit, url = get_marker_limit_and_url(req)
groups = config.SERVICE.get_global_groups(
utils.get_auth_token(req), marker, limit, url)
return utils.send_result(200, req, groups)
@utils.wrap_error
def get_group(self, req, group_id):
tenant = config.SERVICE.get_global_group(utils.get_auth_token(req),
group_id)
return utils.send_result(200, req, tenant)
@utils.wrap_error
def update_group(self, req, group_id):
group = utils.get_normalized_request_content(GlobalGroup, req)
rval = config.SERVICE.update_global_group(
utils.get_auth_token(req), group_id, group)
return utils.send_result(200, req, rval)
@utils.wrap_error
def delete_group(self, req, group_id):
rval = config.SERVICE.delete_global_group(utils.get_auth_token(req),
group_id)
return utils.send_result(204, req, rval)
@utils.wrap_error
def get_users_global_group(self, req, group_id):
marker, limit, url = get_marker_limit_and_url(req)
users = config.SERVICE.get_users_global_group(
utils.get_auth_token(req), group_id, marker, limit, url)
return utils.send_result(200, req, users)
@utils.wrap_error
def add_user_global_group(self, req, group_id, user_id):
return utils.send_result(201, req,
config.SERVICE.add_user_global_group(utils.get_auth_token(req),
group_id, user_id))
@utils.wrap_error
def delete_user_global_group(self, req, group_id, user_id):
return utils.send_result(204, req,
config.SERVICE.delete_user_global_group(utils.get_auth_token(req),
group_id, user_id))

View File

@ -1,11 +1,11 @@
from keystone import utils
from keystone.common import wsgi
import keystone.config as config
from keystone.logic.types.tenant import Tenant, Group
from keystone.logic.types.tenant import Tenant
from . import get_marker_limit_and_url
class TenantController(wsgi.Controller):
"""Controller for Tenant and Tenant Group related operations"""
"""Controller for Tenant related operations"""
def __init__(self, options):
self.options = options
@ -41,54 +41,3 @@ class TenantController(wsgi.Controller):
rval = config.SERVICE.delete_tenant(utils.get_auth_token(req),
tenant_id)
return utils.send_result(204, req, rval)
@utils.wrap_error
def create_tenant_group(self, req, tenant_id):
group = utils.get_normalized_request_content(Group, req)
return utils.send_result(201, req, config.SERVICE.create_tenant_group(
utils.get_auth_token(req), tenant_id, group))
@utils.wrap_error
def get_tenant_groups(self, req, tenant_id):
marker, limit, url = get_marker_limit_and_url(req)
groups = config.SERVICE.get_tenant_groups(utils.get_auth_token(req),
tenant_id, marker, limit, url)
return utils.send_result(200, req, groups)
@utils.wrap_error
def get_tenant_group(self, req, tenant_id, group_id):
tenant = config.SERVICE.get_tenant_group(utils.get_auth_token(req),
tenant_id, group_id)
return utils.send_result(200, req, tenant)
@utils.wrap_error
def update_tenant_group(self, req, tenant_id, group_id):
group = utils.get_normalized_request_content(Group, req)
rval = config.SERVICE.update_tenant_group(utils.get_auth_token(req),
tenant_id, group_id, group)
return utils.send_result(200, req, rval)
@utils.wrap_error
def delete_tenant_group(self, req, tenant_id, group_id):
rval = config.SERVICE.delete_tenant_group(utils.get_auth_token(req),
tenant_id, group_id)
return utils.send_result(204, req, rval)
@utils.wrap_error
def get_users_tenant_group(self, req, tenant_id, group_id):
marker, limit, url = get_marker_limit_and_url(req)
users = config.SERVICE.get_users_tenant_group(
utils.get_auth_token(req), tenant_id, group_id, marker, limit, url)
return utils.send_result(200, req, users)
@utils.wrap_error
def add_user_tenant_group(self, req, tenant_id, group_id, user_id):
return utils.send_result(201, req,
config.SERVICE.add_user_tenant_group(utils.get_auth_token(req),
tenant_id, group_id, user_id))
@utils.wrap_error
def delete_user_tenant_group(self, req, tenant_id, group_id, user_id):
return utils.send_result(204, req,
config.SERVICE.delete_user_tenant_group(utils.get_auth_token(req),
tenant_id, group_id, user_id))

View File

@ -67,10 +67,3 @@ class UserController(wsgi.Controller):
users = config.SERVICE.get_tenant_users(utils.get_auth_token(req),
tenant_id, marker, limit, url)
return utils.send_result(200, req, users)
@utils.wrap_error
def get_user_groups(self, req, user_id):
marker, limit, url = get_marker_limit_and_url(req)
groups = config.SERVICE.get_user_groups(utils.get_auth_token(req),
user_id, marker, limit, url)
return utils.send_result(200, req, groups)

View File

@ -21,8 +21,8 @@ import keystone.backends as backends
import keystone.backends.api as api
import keystone.backends.models as models
from keystone.logic.types import fault
from keystone.logic.types.tenant import GlobalGroup, GlobalGroups, Group, \
Groups, Tenant, Tenants, User as TenantUser
from keystone.logic.types.tenant import \
Tenant, Tenants, User as TenantUser
from keystone.logic.types.role import Role, RoleRef, RoleRefs, Roles
from keystone.logic.types.user import User, User_Update, Users
from keystone.logic.types.endpoint import Endpoint, Endpoints, \
@ -196,224 +196,11 @@ class IdentityService(object):
if not api.tenant.is_empty(tenant_id):
raise fault.ForbiddenFault("You may not delete a tenant that "
"contains get_users or groups")
"contains get_users")
api.tenant.delete(dtenant.id)
return None
#
# Tenant Group Operations
#
def create_tenant_group(self, admin_token, tenant, group):
self.__validate_admin_token(admin_token)
if not isinstance(group, Group):
raise fault.BadRequestFault("Expecting a Group")
if tenant == None:
raise fault.BadRequestFault("Expecting a Tenant Id")
dtenant = api.tenant.get(tenant)
if dtenant == None:
raise fault.ItemNotFoundFault("The tenant not found")
if group.group_id == None:
raise fault.BadRequestFault("Expecting a Group Id")
if api.group.get(group.group_id) != None:
raise fault.TenantGroupConflictFault(
"A tenant group with that id already exists")
dtenant = models.Group()
dtenant.id = group.group_id
dtenant.desc = group.description
dtenant.tenant_id = tenant
api.tenant_group.create(dtenant)
return Group(dtenant.id, dtenant.desc, dtenant.tenant_id)
def get_tenant_groups(self, admin_token, tenant_id, marker, limit, url):
self.__validate_admin_token(admin_token)
if tenant_id == None:
raise fault.BadRequestFault("Expecting a Tenant Id")
dtenant = api.tenant.get(tenant_id)
if dtenant == None:
raise fault.ItemNotFoundFault("The tenant not found")
ts = []
dtenantgroups = api.tenant_group.get_page(tenant_id, marker, limit)
for dtenantgroup in dtenantgroups:
ts.append(Group(dtenantgroup.id,
dtenantgroup.desc,
dtenantgroup.tenant_id))
prev, next = api.tenant_group.get_page_markers(tenant_id, marker,
limit)
links = []
if prev:
links.append(atom.Link('prev', "%s?'marker=%s&limit=%s'" \
% (url, prev, limit)))
if next:
links.append(atom.Link('next', "%s?'marker=%s&limit=%s'"\
% (url, next, limit)))
return Groups(ts, links)
def get_tenant_group(self, admin_token, tenant_id, group_id):
self.__validate_admin_token(admin_token)
dtenant = api.tenant.get(tenant_id)
if dtenant == None:
raise fault.ItemNotFoundFault("The tenant not found")
dtenant = api.tenant_group.get(group_id, tenant_id)
if not dtenant:
raise fault.ItemNotFoundFault("The tenant group not found")
return Group(dtenant.id, dtenant.desc, dtenant.tenant_id)
def update_tenant_group(self, admin_token, tenant_id, group_id, group):
self.__validate_admin_token(admin_token)
if not isinstance(group, Group):
raise fault.BadRequestFault("Expecting a Group")
True
dtenant = api.tenant.get(tenant_id)
if dtenant == None:
raise fault.ItemNotFoundFault("The tenant not found")
dtenant = api.tenant_group.get(group_id, tenant_id)
if not dtenant:
raise fault.ItemNotFoundFault("The tenant group not found")
if group_id != group.group_id:
raise fault.BadRequestFault("Wrong Data Provided,\
Group id not matching")
if str(tenant_id) != str(group.tenant_id):
raise fault.BadRequestFault("Wrong Data Provided,\
Tenant id not matching ")
values = {'desc': group.description}
api.tenant_group.update(group_id, tenant_id, values)
return Group(group_id, group.description, tenant_id)
def delete_tenant_group(self, admin_token, tenant_id, group_id):
self.__validate_admin_token(admin_token)
dtenant = api.tenant.get(tenant_id)
if dtenant == None:
raise fault.ItemNotFoundFault("The tenant not found")
dtenant = api.tenant_group.get(group_id, tenant_id)
if not dtenant:
raise fault.ItemNotFoundFault("The tenant group not found")
if not api.tenant_group.is_empty(group_id):
raise fault.ForbiddenFault("You may not delete a tenant that "
"contains get_users or groups")
api.tenant_group.delete(group_id, tenant_id)
return None
def get_users_tenant_group(self, admin_token, tenantId, groupId, marker,
limit, url):
self.__validate_admin_token(admin_token)
if tenantId == None:
raise fault.BadRequestFault("Expecting a Tenant Id")
if api.tenant.get(tenantId) == None:
raise fault.ItemNotFoundFault("The tenant not found")
if api.tenant_group.get(groupId, tenantId) == None:
raise fault.ItemNotFoundFault(
"A tenant group with that id not found")
ts = []
dgroupusers = api.user.users_tenant_group_get_page(groupId, marker,
limit)
for dgroupuser, _dgroupuserAsso in dgroupusers:
# TODO: TenantUser is deprecated, and a near-duplicate of
# keystone.logic.types.user.User
ts.append(TenantUser(
user_id=dgroupuser.id,
email=dgroupuser.email,
enabled=dgroupuser.enabled,
tenant_id=tenantId,
group_id=None))
links = []
if ts.__len__():
prev, next = api.user.users_tenant_group_get_page_markers(
groupId, marker, limit)
if prev:
links.append(atom.Link('prev', "%s?'marker=%s&limit=%s'" %
(url, prev, limit)))
if next:
links.append(atom.Link('next', "%s?'marker=%s&limit=%s'" %
(url, next, limit)))
return Users(ts, links)
def add_user_tenant_group(self, admin_token, tenant, group, user):
self.__validate_admin_token(admin_token)
if api.tenant.get(tenant) == None:
raise fault.ItemNotFoundFault("The Tenant not found")
if api.group.get(group) == None:
raise fault.ItemNotFoundFault("The Group not found")
duser = api.user.get(user)
if duser == None:
raise fault.ItemNotFoundFault("The User not found")
if api.tenant_group.get(group, tenant) == None:
raise fault.ItemNotFoundFault("A tenant group with"
" that id not found")
if api.user.get_by_group(user, group) != None:
raise fault.UserGroupConflictFault(
"A user with that id already exists in group")
dusergroup = models.UserGroupAssociation()
dusergroup.user_id = user
dusergroup.group_id = group
api.user.tenant_group(dusergroup)
# TODO: TenantUser is deprecated, and a near-duplicate of
# keystone.logic.types.user.User
return TenantUser(
user_id=duser.id,
email=duser.email,
enabled=duser.enabled,
tenant_id=tenant,
group_id=group) #attribute no longer exists
def delete_user_tenant_group(self, admin_token, tenant, group, user):
self.__validate_admin_token(admin_token)
if api.tenant.get(tenant) == None:
raise fault.ItemNotFoundFault("The Tenant not found")
if api.group.get(group) == None:
raise fault.ItemNotFoundFault("The Group not found")
duser = api.user.get(user)
if duser == None:
raise fault.ItemNotFoundFault("The User not found")
if api.tenant_group.get(group, tenant) == None:
raise fault.ItemNotFoundFault("A tenant group with"
" that id not found")
if api.user.get_by_group(user, group) == None:
raise fault.ItemNotFoundFault("A user with that id "
"in a group not found")
api.user.tenant_group_delete(user, group)
return None
#
# Private Operations
#
@ -526,13 +313,8 @@ class IdentityService(object):
dtenant = api.tenant.get(duser.tenant_id)
ts = []
dusergroups = api.user.user_groups_get_all(user_id)
for dusergroup, _dusergroupAsso in dusergroups:
ts.append(Group(dusergroup.id, dusergroup.tenant_id, None))
return User_Update(None, duser.id, duser.tenant_id,
duser.email, duser.enabled, ts)
duser.email, duser.enabled)
def update_user(self, admin_token, user_id, user):
self.__validate_admin_token(admin_token)
@ -575,7 +357,7 @@ class IdentityService(object):
api.user.update(user_id, values)
return User_Update(user.password,
None, None, None, None, None)
None, None, None, None)
def enable_disable_user(self, admin_token, user_id, user):
self.__validate_admin_token(admin_token)
@ -594,7 +376,7 @@ class IdentityService(object):
api.user.update(user_id, values)
return User_Update(None,
None, None, None, user.enabled, None)
None, None, None, user.enabled)
def set_user_tenant(self, admin_token, user_id, user):
self.__validate_admin_token(admin_token)
@ -612,7 +394,7 @@ class IdentityService(object):
values = {'tenant_id': user.tenant_id}
api.user.update(user_id, values)
return User_Update(None,
None, user.tenant_id, None, None, None)
None, user.tenant_id, None, None)
def delete_user(self, admin_token, user_id):
self.__validate_admin_token(admin_token)
@ -627,231 +409,6 @@ class IdentityService(object):
api.user.delete(user_id)
return None
def get_user_groups(self, admin_token, user_id, marker, limit,
url):
self.__validate_admin_token(admin_token)
ts = []
dusergroups = api.group.get_by_user_get_page(user_id, marker,
limit)
for dusergroup, _dusergroupAsso in dusergroups:
ts.append(Group(dusergroup.id, dusergroup.desc,
dusergroup.tenant_id))
links = []
if ts.__len__():
prev, next = api.group.get_by_user_get_page_markers(user_id,
marker, limit)
if prev:
links.append(atom.Link('prev', "%s?'marker=%s&limit=%s'" %
(url, prev, limit)))
if next:
links.append(atom.Link('next', "%s?'marker=%s&limit=%s'" %
(url, next, limit)))
return Groups(ts, links)
#
# Global Group Operations
# TODO:(India Team) Rename functions
# and to maintain consistency
# with server.py
def __check_create_global_tenant(self):
dtenant = api.tenant.get('GlobalTenant')
if dtenant is None:
dtenant = models.Tenant()
dtenant.id = 'GlobalTenant'
dtenant.desc = 'GlobalTenant is Default tenant for global groups'
dtenant.enabled = True
api.tenant.create(dtenant)
return dtenant
def create_global_group(self, admin_token, group):
self.__validate_admin_token(admin_token)
if not isinstance(group, GlobalGroup):
raise fault.BadRequestFault("Expecting a Group")
if group.group_id == None:
raise fault.BadRequestFault("Expecting a Group Id")
if api.group.get(group.group_id) != None:
raise fault.TenantGroupConflictFault(
"A tenant group with that id already exists")
gtenant = self.__check_create_global_tenant()
dtenant = models.Group()
dtenant.id = group.group_id
dtenant.desc = group.description
dtenant.tenant_id = gtenant.id
api.tenant_group.create(dtenant)
return GlobalGroup(dtenant.id, dtenant.desc, None)
def get_global_groups(self, admin_token, marker, limit, url):
self.__validate_admin_token(admin_token)
gtenant = self.__check_create_global_tenant()
ts = []
dtenantgroups = api.tenant_group.get_page(gtenant.id, \
marker, limit)
for dtenantgroup in dtenantgroups:
ts.append(GlobalGroup(dtenantgroup.id,
dtenantgroup.desc))
prev, next = api.tenant_group.get_page_markers(gtenant.id,
marker, limit)
links = []
if prev:
links.append(atom.Link('prev', "%s?'marker=%s&limit=%s'" %
(url, prev, limit)))
if next:
links.append(atom.Link('next', "%s?'marker=%s&limit=%s'" %
(url, next, limit)))
return GlobalGroups(ts, links)
def get_global_group(self, admin_token, group_id):
self.__validate_admin_token(admin_token)
gtenant = self.__check_create_global_tenant()
dtenant = api.tenant.get(gtenant.id)
if dtenant == None:
raise fault.ItemNotFoundFault("The Global tenant not found")
dtenant = api.tenant_group.get(group_id, gtenant.id)
if not dtenant:
raise fault.ItemNotFoundFault("The Global tenant group not found")
return GlobalGroup(dtenant.id, dtenant.desc)
def update_global_group(self, admin_token, group_id, group):
self.__validate_admin_token(admin_token)
gtenant = self.__check_create_global_tenant()
if not isinstance(group, GlobalGroup):
raise fault.BadRequestFault("Expecting a Group")
dtenant = api.tenant.get(gtenant.id)
if dtenant == None:
raise fault.ItemNotFoundFault("The global tenant not found")
dtenant = api.tenant_group.get(group_id, gtenant.id)
if not dtenant:
raise fault.ItemNotFoundFault("The Global tenant group not found")
if group_id != group.group_id:
raise fault.BadRequestFault("Wrong Data Provided,"
"Group id not matching")
values = {'desc': group.description}
api.tenant_group.update(group_id, gtenant.id, values)
return GlobalGroup(group_id, group.description, gtenant.id)
def delete_global_group(self, admin_token, group_id):
self.__validate_admin_token(admin_token)
gtenant = self.__check_create_global_tenant()
dtenant = api.tenant.get(gtenant.id)
if dtenant == None:
raise fault.ItemNotFoundFault("The global tenant not found")
dtenant = api.tenant_group.get(group_id, dtenant.id)
if not dtenant:
raise fault.ItemNotFoundFault("The global tenant group not found")
if not api.tenant_group.is_empty(group_id):
raise fault.ForbiddenFault("You may not delete a group that "
"contains get_users")
api.tenant_group.delete(group_id, gtenant.id)
return None
def get_users_global_group(self, admin_token, groupId, marker, limit, url):
self.__validate_admin_token(admin_token)
gtenant = self.__check_create_global_tenant()
if gtenant.id == None:
raise fault.BadRequestFault("Expecting a global Tenant")
if api.tenant.get(gtenant.id) == None:
raise fault.ItemNotFoundFault("The global tenant not found")
if api.tenant_group.get(groupId, gtenant.id) == None:
raise fault.ItemNotFoundFault(
"A global tenant group with that id not found")
ts = []
dgroupusers = api.user.users_tenant_group_get_page(groupId, marker,
limit)
for dgroupuser, _dgroupuserassoc in dgroupusers:
# TODO: TenantUser is deprecated, and a near-duplicate of
# keystone.logic.types.user.User
ts.append(TenantUser(
user_id=dgroupuser.id,
email=dgroupuser.email,
enabled=dgroupuser.enabled))
links = []
if ts.__len__():
prev, next = api.user.users_tenant_group_get_page_markers(groupId,
marker, limit)
if prev:
links.append(atom.Link('prev', "%s?'marker=%s&limit=%s'"
% (url, prev, limit)))
if next:
links.append(atom.Link('next', "%s?'marker=%s&limit=%s'"
% (url, next, limit)))
return Users(ts, links)
def add_user_global_group(self, admin_token, group, user):
self.__validate_admin_token(admin_token)
gtenant = self.__check_create_global_tenant()
if api.tenant.get(gtenant.id) == None:
raise fault.ItemNotFoundFault("The Global Tenant not found")
if api.group.get(group) == None:
raise fault.ItemNotFoundFault("The Group not found")
duser = api.user.get(user)
if duser == None:
raise fault.ItemNotFoundFault("The User not found")
if api.tenant_group.get(group, gtenant.id) == None:
raise fault.ItemNotFoundFault("A global tenant group with"
" that id not found")
if api.user.get_by_group(user, group) != None:
raise fault.UserGroupConflictFault(
"A user with that id already exists in group")
dusergroup = models.UserGroupAssociation()
dusergroup.user_id = user
dusergroup.group_id = group
api.user.tenant_group(dusergroup)
# TODO: TenantUser is deprecated, and a near-duplicate of
# keystone.logic.types.user.User
return TenantUser(
user_id=duser.id,
email=duser.email,
enabled=duser.enabled,
group_id=group) # attribute no longer exists!
def delete_user_global_group(self, admin_token, group, user):
self.__validate_admin_token(admin_token)
gtenant = self.__check_create_global_tenant()
if api.tenant.get(gtenant.id) == None:
raise fault.ItemNotFoundFault("The Global Tenant not found")
if api.group.get(group) == None:
raise fault.ItemNotFoundFault("The Group not found")
duser = api.user.get(user)
if duser == None:
raise fault.ItemNotFoundFault("The User not found")
if api.tenant_group.get(group, gtenant.id) == None:
raise fault.ItemNotFoundFault("A global tenant group with "
"that id not found")
if api.user.get_by_group(user, group) == None:
raise fault.ItemNotFoundFault("A user with that id in a "
"group not found")
api.user.tenant_group_delete(user, group)
return None
def __get_auth_data(self, dtoken, tenant_id):
"""return AuthData object for a token"""
endpoints = None
@ -875,7 +432,7 @@ class IdentityService(object):
for droleRef in droleRefs:
ts.append(RoleRef(droleRef.id, droleRef.role_id,
droleRef.tenant_id))
user = auth.User(duser.id, duser.tenant_id, None, RoleRefs(ts, []))
user = auth.User(duser.id, duser.tenant_id, RoleRefs(ts, []))
return auth.ValidateData(token, user)
def __validate_tenant(self, tenant_id):

View File

@ -87,29 +87,12 @@ class Token(object):
self.tenant_id = tenant_id
class Group(object):
"""A group, optionally belonging to a tenant."""
def __init__(self, group_id, tenant_id):
self.tenant_id = tenant_id
self.group_id = group_id
class Groups(object):
"""A collection of groups."""
def __init__(self, values, links):
self.values = values
self.links = links
class User(object):
"""A user."""
def __init__(self, username, tenant_id, groups, role_refs=None):
def __init__(self, username, tenant_id,role_refs=None):
self.username = username
self.tenant_id = tenant_id
self.groups = groups
self.role_refs = role_refs
@ -211,14 +194,6 @@ class ValidateData(object):
user = etree.Element("user",
username=self.user.username,
tenantId=str(self.user.tenant_id))
"""groups = etree.Element("groups")
for group in self.user.groups.values:
g = etree.Element("group",
tenantId=group.tenant_id)
g.set("id", group.group_id)
groups.append(g)
user.append(groups)
"""
if self.user.role_refs != None:
user.append(self.user.role_refs.to_dom())
dom.append(token)
@ -237,16 +212,6 @@ class ValidateData(object):
if self.user.role_refs != None:
user["roleRefs"] = self.user.role_refs.to_json_values()
"""group = []
for g in self.user.groups.values:
grp = {}
grp["tenantId"] = g.tenant_id
grp["id"] = g.group_id
group.append(grp)
groups = {}
groups["group"] = group
user["groups"] = groups
"""
auth = {}
auth["token"] = token
auth["user"] = user

View File

@ -111,14 +111,6 @@ class TenantConflictFault(IdentityFault):
self.key = "tenantConflict"
class TenantGroupConflictFault(IdentityFault):
"""The tenant Group already exists?"""
def __init__(self, msg, details=None, code=409):
super(TenantGroupConflictFault, self).__init__(msg, details, code)
self.key = "tenantGroupConflict"
class OverlimitFault(IdentityFault):
"""A limit has been exceeded"""
@ -152,15 +144,6 @@ class EmailConflictFault(IdentityFault):
super(EmailConflictFault, self).__init__(msg, details, code)
self.key = "emailConflict"
class UserGroupConflictFault(IdentityFault):
"""The user already exists in group?"""
def __init__(self, msg, details=None, code=409):
super(UserGroupConflictFault, self).__init__(msg, details, code)
self.key = "userGroupConflict"
class RoleConflictFault(IdentityFault):
"""The User already exists?"""

View File

@ -126,219 +126,6 @@ class Tenants(object):
links = [t.to_dict()["links"] for t in self.links]
return json.dumps({"tenants": {"values": values, "links": links}})
class Group(object):
"""Describes a group in the auth system"""
def __init__(self, group_id, description, tenant_id):
self.description = description
self.group_id = group_id
if tenant_id:
self.tenant_id = tenant_id
else:
self.tenant_id = None
@staticmethod
def from_xml(xml_str):
try:
dom = etree.Element("root")
dom.append(etree.fromstring(xml_str))
root = dom.find(\
"{http://docs.openstack.org/identity/api/v2.0}group")
if root == None:
raise fault.BadRequestFault("Expecting Group")
group_id = root.get("id")
tenant_id = None
if root.get("tenantId"):
tenant_id = root.get("tenantId")
else:
tenant_id = None
desc = root.find("{http://docs.openstack.org/identity/api/v2.0}"
"description")
if desc == None:
raise fault.BadRequestFault("Expecting Group Description")
return Group(group_id, desc.text, tenant_id)
except etree.LxmlError as e:
raise fault.BadRequestFault("Cannot parse Group", str(e))
@staticmethod
def from_json(json_str):
try:
obj = json.loads(json_str)
if not "group" in obj:
raise fault.BadRequestFault("Expecting group")
group = obj["group"]
if not "id" in group:
group_id = None
else:
group_id = group["id"]
if not "tenantId" in group:
tenantId = None
else:
tenantId = group["tenantId"]
if not "description" in group:
raise fault.BadRequestFault("Expecting Group Description")
description = group["description"]
return Group(group_id, description, tenantId)
except (ValueError, TypeError) as e:
raise fault.BadRequestFault("Cannot parse Group.", str(e))
def to_dom(self):
dom = etree.Element("group",
xmlns="http://docs.openstack.org/identity/api/v2.0")
if self.group_id:
dom.set("id", self.group_id)
if self.tenant_id:
dom.set("tenantId", self.tenant_id)
desc = etree.Element("description")
desc.text = self.description
dom.append(desc)
return dom
def to_xml(self):
return etree.tostring(self.to_dom())
def to_dict(self):
group = {}
if self.group_id:
group["id"] = self.group_id
group["description"] = self.description
if self.tenant_id:
group["tenantId"] = self.tenant_id
return {'group': group}
def to_json(self):
return json.dumps(self.to_dict())
class Groups(object):
"""A collection of groups."""
def __init__(self, values, links):
self.values = values
self.links = links
def to_xml(self):
dom = etree.Element("groups")
dom.set(u"xmlns", "http://docs.openstack.org/identity/api/v2.0")
for t in self.values:
dom.append(t.to_dom())
for t in self.links:
dom.append(t.to_dom())
return etree.tostring(dom)
def to_json(self):
values = [t.to_dict()["group"] for t in self.values]
links = [t.to_dict()["links"] for t in self.links]
return json.dumps({"groups": {"values": values, "links": links}})
class GlobalGroup(object):
"""Describes a group in the auth system"""
def __init__(self, group_id, description, tenant_id=None):
self.description = description
self.group_id = group_id
@staticmethod
def from_xml(xml_str):
try:
dom = etree.Element("root")
dom.append(etree.fromstring(xml_str))
root = dom.find(\
"{http://docs.openstack.org/identity/api/v2.0}group")
if root == None:
raise fault.BadRequestFault("Expecting Group")
group_id = root.get("id")
desc = root.find("{http://docs.openstack.org/identity/api/v2.0}"
"description")
if desc == None:
raise fault.BadRequestFault("Expecting Group Description")
return GlobalGroup(group_id, desc.text)
except etree.LxmlError as e:
raise fault.BadRequestFault("Cannot parse Group", str(e))
@staticmethod
def from_json(json_str):
try:
obj = json.loads(json_str)
if not "group" in obj:
raise fault.BadRequestFault("Expecting group")
group = obj["group"]
if not "id" in group:
group_id = None
else:
group_id = group["id"]
if not "description" in group:
raise fault.BadRequestFault("Expecting Group Description")
description = group["description"]
return GlobalGroup(group_id, description)
except (ValueError, TypeError) as e:
raise fault.BadRequestFault("Cannot parse Group.", str(e))
def to_dom(self):
dom = etree.Element("group",
xmlns="http://docs.openstack.org/identity/api/v2.0")
if self.group_id:
dom.set("id", self.group_id)
desc = etree.Element("description")
desc.text = self.description
dom.append(desc)
return dom
def to_xml(self):
return etree.tostring(self.to_dom())
def to_dict(self):
group = {}
if self.group_id:
group["id"] = self.group_id
group["description"] = self.description
return {'group': group}
def to_json(self):
return json.dumps(self.to_dict())
class GlobalGroups(object):
"""A collection of groups."""
def __init__(self, values, links):
self.values = values
self.links = links
def to_xml(self):
dom = etree.Element("groups")
dom.set(u"xmlns", "http://docs.openstack.org/identity/api/v2.0")
for t in self.values:
dom.append(t.to_dom())
for t in self.links:
dom.append(t.to_dom())
return etree.tostring(dom)
def to_json(self):
values = [t.to_dict()["group"] for t in self.values]
links = [t.to_dict()["links"] for t in self.links]
return json.dumps({"groups": {"values": values, "links": links}})
class User(object):
"""Describes a user in the auth system
@ -346,12 +133,8 @@ class User(object):
should be considered deprecated.
"""
def __init__(self, user_id, email, enabled, tenant_id='', group_id=''):
def __init__(self, user_id, email, enabled, tenant_id=''):
self.user_id = user_id
if group_id:
self.group_id = group_id
else:
self.group_id = None
if tenant_id:
self.tenant_id = tenant_id
else:
@ -362,8 +145,6 @@ class User(object):
def to_dom(self):
dom = etree.Element("user",
xmlns="http://docs.openstack.org/identity/api/v2.0")
if self.group_id != None:
dom.set("group_id", self.group_id)
if self.user_id:
dom.set("id", self.user_id)
if self.tenant_id:
@ -379,8 +160,6 @@ class User(object):
def to_dict(self):
user = {}
if self.group_id != None:
user["group_id"] = self.group_id
user["id"] = self.user_id
user["email"] = self.email
user["enabled"] = string.lower(str(self.enabled))

View File

@ -139,13 +139,12 @@ class User_Update(object):
"""Document me!"""
def __init__(self, password, user_id, tenant_id, email,
enabled, group=None):
enabled):
self.user_id = user_id
self.tenant_id = tenant_id
self.password = password
self.email = email
self.enabled = enabled and True or False
self.group = group
@staticmethod
def from_xml(xml_str):
@ -221,10 +220,6 @@ class User_Update(object):
dom.set("enabled", string.lower(str(self.enabled)))
if self.password:
dom.set("password", self.password)
if self.group is not None:
for group in self.group:
dom.append(group.to_dom())
return dom
def to_xml(self):
@ -243,9 +238,6 @@ class User_Update(object):
user["email"] = self.email
if self.enabled is not None:
user["enabled"] = self.enabled
if self.group is not None:
values = [t.to_dict()["group"] for t in self.group]
user["groups"] = {"values": values}
return {'user': user}
def to_json(self):

View File

@ -126,9 +126,6 @@ class AuthProtocol(object):
proxy_headers, env)
_decorate_request_headers('X_TENANT', 'blank',
proxy_headers, env)
_decorate_request_headers('X_GROUP', 'Blank',
proxy_headers, env)
#Auth processed, headers added now decide how to pass on the call
if self.app:
# Pass to downstream WSGI component

View File

@ -28,7 +28,7 @@ This WSGI component performs multiple jobs:
mode, which means the final decision is delegated to the downstream WSGI
component (usually the OpenStack service)
- it will collect and forward identity information from a valid token
such as user name, groups, etc...
such as user name etc...
Refer to: http://wiki.openstack.org/openstack-authn
@ -160,14 +160,10 @@ class AuthProtocol(object):
# Store authentication data
if claims:
# TODO(Ziad): add additional details we may need,
# like tenant and group info
self._decorate_request('X_AUTHORIZATION', "Proxy %s" %
claims['user'])
self._decorate_request('X_TENANT', claims['tenant'])
self._decorate_request('X_USER', claims['user'])
if 'group' in claims:
self._decorate_request('X_GROUP', claims['group'])
if 'roles' in claims and len(claims['roles']) > 0:
if claims['roles'] != None:
roles = ''
@ -274,8 +270,6 @@ class AuthProtocol(object):
raise LookupError('Unable to locate claims: %s' % resp.status)
token_info = json.loads(data)
#TODO(Ziad): make this more robust
#first_group = token_info['auth']['user']['groups']['group'][0]
roles = []
role_refs = token_info["auth"]["user"]["roleRefs"]
if role_refs != None:
@ -285,10 +279,6 @@ class AuthProtocol(object):
verified_claims = {'user': token_info['auth']['user']['username'],
'tenant': token_info['auth']['user']['tenantId'],
'roles': roles}
# TODO(Ziad): removed groups for now
# ,'group': '%s/%s' % (first_group['id'],
# first_group['tenantId'])}
return verified_claims
def _decorate_request(self, index, value):

View File

@ -221,7 +221,6 @@ class AuthProtocol(object):
for role_ref in role_refs:
roles.append(role_ref["roleId"])
# TODO(Ziad): add groups back in
identity = {'user': identity_info['auth']['user']['username'],
'tenant': identity_info['auth']['user']['tenantId'],
'roles':roles}

View File

@ -4,7 +4,6 @@ from keystone.common import wsgi
import keystone.backends as db
from keystone.controllers.auth import AuthController
from keystone.controllers.endpointtemplates import EndpointTemplatesController
from keystone.controllers.groups import GroupsController
from keystone.controllers.roles import RolesController
from keystone.controllers.staticfiles import StaticFilesController
from keystone.controllers.tenant import TenantController
@ -50,42 +49,6 @@ class AdminApi(wsgi.Router):
controller=tenant_controller,
action="delete_tenant", conditions=dict(method=["DELETE"]))
# Tenant Group Operations
mapper.connect("/v2.0/tenants/{tenant_id}/groups",
controller=tenant_controller,
action="create_tenant_group",
conditions=dict(method=["PUT", "POST"]))
mapper.connect("/v2.0/tenants/{tenant_id}/groups",
controller=tenant_controller,
action="get_tenant_groups",
conditions=dict(method=["GET"]))
mapper.connect("/v2.0/tenants/{tenant_id}/groups/{group_id}",
controller=tenant_controller,
action="get_tenant_group",
conditions=dict(method=["GET"]))
mapper.connect("/v2.0/tenants/{tenant_id}/groups/{group_id}",
controller=tenant_controller,
action="update_tenant_group",
conditions=dict(method=["PUT"]))
mapper.connect("/v2.0/tenants/{tenant_id}/groups/{group_id}",
controller=tenant_controller,
action="delete_tenant_group",
conditions=dict(method=["DELETE"]))
mapper.connect("/v2.0/tenants/{tenant_id}/groups/{group_id}/users",
controller=tenant_controller,
action="get_users_tenant_group",
conditions=dict(method=["GET"]))
mapper.connect(
"/v2.0/tenants/{tenant_id}/groups/{group_id}/users/{user_id}",
controller=tenant_controller,
action="add_user_tenant_group",
conditions=dict(method=["PUT"]))
mapper.connect(
"/v2.0/tenants/{tenant_id}/groups/{group_id}/users/{user_id}",
controller=tenant_controller,
action="delete_user_tenant_group",
conditions=dict(method=["DELETE"]))
# User Operations
user_controller = UserController(options)
mapper.connect("/v2.0/users",
@ -121,39 +84,10 @@ class AdminApi(wsgi.Router):
controller=user_controller,
action="set_user_enabled",
conditions=dict(method=["PUT"]))
mapper.connect("/v2.0/users/{user_id}/groups",
controller=user_controller,
action="get_user_groups",
conditions=dict(method=["GET"]))
mapper.connect("/v2.0/tenants/{tenant_id}/users",
controller=user_controller,
action="get_tenant_users",
conditions=dict(method=["GET"]))
#Global Groups
groups_controller = GroupsController(options)
mapper.connect("/v2.0/groups", controller=groups_controller,
action="create_group",
conditions=dict(method=["PUT", "POST"]))
mapper.connect("/v2.0/groups", controller=groups_controller,
action="get_groups", conditions=dict(method=["GET"]))
mapper.connect("/v2.0/groups/{group_id}", controller=groups_controller,
action="get_group", conditions=dict(method=["GET"]))
mapper.connect("/v2.0/groups/{group_id}", controller=groups_controller,
action="update_group", conditions=dict(method=["PUT"]))
mapper.connect("/v2.0/groups/{group_id}", controller=groups_controller,
action="delete_group", conditions=dict(method=["DELETE"]))
mapper.connect("/v2.0/groups/{group_id}/users",
controller=groups_controller,
action="get_users_global_group",
conditions=dict(method=["GET"]))
mapper.connect("/v2.0/groups/{group_id}/users/{user_id}",
controller=groups_controller,
action="add_user_global_group",
conditions=dict(method=["PUT"]))
mapper.connect("/v2.0/groups/{group_id}/users/{user_id}",
controller=groups_controller,
action="delete_user_global_group",
conditions=dict(method=["DELETE"]))
#Roles and RoleRefs
roles_controller = RolesController(options)

View File

@ -93,18 +93,6 @@ def create_tenant(tenantid, auth_token):
return (resp, content)
def create_tenant_group(groupid, tenantid, auth_token):
header = httplib2.Http(".cache")
url = '%stenants/%s/groups' % (URL_V2, tenantid)
body = {"group": {"id": groupid,
"description": "A description ..."}}
resp, content = header.request(url, "PUT", body=json.dumps(body),
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def delete_tenant(tenantid, auth_token):
header = httplib2.Http(".cache")
url = '%stenants/%s' % (URL_V2, tenantid)
@ -113,62 +101,6 @@ def delete_tenant(tenantid, auth_token):
"X-Auth-Token": auth_token})
return resp
def delete_tenant_group(groupid, tenantid, auth_token):
header = httplib2.Http(".cache")
url = '%stenants/%s/groups/%s' % (URL_V2, tenantid, groupid)
resp, content = header.request(url, "DELETE", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def create_global_group(groupid, auth_token):
header = httplib2.Http(".cache")
url = '%sgroups' % (URL_V2)
body = {"group": {"id": groupid,
"description": "A description ..."}}
resp, content = header.request(url, "POST", body=json.dumps(body),
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def create_global_group_xml(groupid, auth_token):
header = httplib2.Http(".cache")
url = '%sgroups' % (URL_V2)
body = '<?xml version="1.0" encoding="UTF-8"?>\
<group xmlns="http://docs.openstack.org/identity/api/v2.0" \
id="%s"><description>A Description of the group</description>\
</group>' % groupid
resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
return (resp, content)
def delete_global_group(groupid, auth_token):
header = httplib2.Http(".cache")
url = '%sgroups/%s' % (URL_V2, groupid)
resp, content = header.request(url, "DELETE", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def delete_global_group_xml(groupid, auth_token):
header = httplib2.Http(".cache")
url = '%sgroups/%s' % (URL_V2, groupid)
resp, content = header.request(url, "DELETE", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
return (resp, content)
def get_token_xml(user, pswd, tenant_id, return_type=''):
header = httplib2.Http(".cache")
url = '%stokens' % URL_V2
@ -220,21 +152,6 @@ def create_tenant_xml(tenantid, auth_token):
return (resp, content)
def create_tenant_group_xml(groupid, tenantid, auth_token):
header = httplib2.Http(".cache")
url = '%stenants/%s/groups' % (URL_V2, tenantid)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/identity/api/v2.0" \
id="%s"> \
<description>A description...</description> \
</group>' % groupid
resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
return (resp, content)
def delete_tenant_xml(tenantid, auth_token):
header = httplib2.Http(".cache")
url = '%stenants/%s' % (URL_V2, tenantid)
@ -245,17 +162,6 @@ def delete_tenant_xml(tenantid, auth_token):
return resp
def delete_tenant_group_xml(groupid, tenantid, auth_token):
header = httplib2.Http(".cache")
url = '%stenant/%s/groups/%s' % (URL_V2, tenantid, groupid)
resp, content = header.request(url, "DELETE", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
return (resp, content)
def create_user(tenantid, userid, auth_token, email=None, password = 'secrete'):
header = httplib2.Http(".cache")
url = '%susers' % (URL_V2)
@ -498,146 +404,6 @@ def users_get_xml(tenant_id, auth_token):
"ACCEPT": "application/xml"})
return (resp, content)
def users_group_get_json(user_id, auth_token):
header = httplib2.Http(".cache")
url = '%susers/%s/groups' % (URL_V2, user_id)
resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def users_group_get_xml(user_id, auth_token):
header = httplib2.Http(".cache")
url = '%susers/%s/groups' % (URL_V2, user_id)
resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
return (resp, content)
def add_user_tenant_group(tenantid, groupid, userid, auth_token):
header = httplib2.Http(".cache")
url = '%stenants/%s/groups/%s/users/%s' % (
URL_V2, tenantid, groupid, userid)
resp, content = header.request(url, "PUT", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def add_user_tenant_group_xml(tenantid, groupid, userid, auth_token):
header = httplib2.Http(".cache")
url = '%stenants/%s/groups/%s/users/%s' % (
URL_V2, tenantid, groupid, userid)
resp, content = header.request(url, "PUT", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
return (resp, content)
def delete_user_tenant_group(tenantid, groupid, userid, auth_token):
header = httplib2.Http(".cache")
url = '%stenants/%s/groups/%s/users/%s' % (
URL_V2, tenantid, groupid, userid)
resp, content = header.request(url, "DELETE", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def delete_user_tenant_group_xml(tenantid, groupid, userid, auth_token):
header = httplib2.Http(".cache")
url = '%stenants/%s/groups/%s/users/%s' % (
URL_V2, tenantid, groupid, userid)
resp, content = header.request(url, "DELETE", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
return (resp, content)
def get_user_tenant_group(tenantid, groupid, auth_token):
header = httplib2.Http(".cache")
url = '%stenants/%s/groups/%s/users' % (URL_V2, tenantid, groupid)
resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def get_user_tenant_group_xml(tenantid, groupid, auth_token):
header = httplib2.Http(".cache")
url = '%stenants/%s/groups/%s/users' % (URL_V2, tenantid, groupid)
resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
return (resp, content)
def add_user_global_group(groupid, userid, auth_token):
header = httplib2.Http(".cache")
url = '%sgroups/%s/users/%s' % (URL_V2, groupid, userid)
resp, content = header.request(url, "PUT", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def add_user_global_group_xml(groupid, userid, auth_token):
header = httplib2.Http(".cache")
url = '%sgroups/%s/users/%s' % (URL_V2, groupid, userid)
resp, content = header.request(url, "PUT", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
return (resp, content)
def delete_user_global_group(groupid, userid, auth_token):
header = httplib2.Http(".cache")
url = '%sgroups/%s/users/%s' % (URL_V2, groupid, userid)
resp, content = header.request(url, "DELETE", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def delete_user_global_group_xml(groupid, userid, auth_token):
header = httplib2.Http(".cache")
url = '%sgroups/%s/users/%s' % (URL_V2, groupid, userid)
resp, content = header.request(url, "DELETE", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
return (resp, content)
def get_user_global_group(groupid, auth_token):
header = httplib2.Http(".cache")
url = '%sgroups/%s/users' % (URL_V2, groupid)
resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def get_userid():
return 'test_user11'
@ -649,18 +415,6 @@ def get_password():
def get_email():
return 'joetest@openstack.org'
def get_user_global_group_xml(groupid, auth_token):
header = httplib2.Http(".cache")
url = '%sgroups/%s/users' % (URL_V2, groupid)
resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
return (resp, content)
def get_tenant():
return '1234'
@ -769,7 +523,7 @@ def create_role_xml(role_id, auth_token):
url = '%sroles' % (URL_V2)
body = '<?xml version="1.0" encoding="UTF-8"?>\
<role xmlns="http://docs.openstack.org/identity/api/v2.0" \
id="%s" description="A Description of the group"/>\
id="%s" description="A Description of the role"/>\
' % role_id
resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",

View File

@ -1,965 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import httplib2
import json
from lxml import etree
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.abspath(__file__),
'..', '..', '..', '..', '..', 'keystone')))
import unittest
import test_common as utils
##
## Global Group Tests
##
class GlobalGroupTest(unittest.TestCase):
def setUp(self):
self.globaltenant = utils.get_global_tenant()
self.user = utils.get_user()
self.userdisabled = utils.get_userdisabled()
self.auth_token = utils.get_auth_token()
self.exp_auth_token = utils.get_exp_auth_token()
self.disabled_token = utils.get_disabled_token()
self.global_group = 'test_global_group_add'
utils.create_tenant(self.globaltenant, str(self.auth_token))
utils.create_user(self.globaltenant, self.user, self.auth_token)
utils.add_user_json(self.auth_token)
self.token = utils.get_token(self.user, 'secrete', self.globaltenant,
'token')
def tearDown(self):
utils.delete_user(self.user, str(self.auth_token))
utils.delete_global_group(self.global_group, self.auth_token)
utils.delete_tenant(self.globaltenant, self.auth_token)
class CreateGlobalGroupTest(GlobalGroupTest):
def test_global_group_create(self):
utils.delete_global_group(self.global_group, str(self.auth_token))
resp_new, _content_new = utils.create_global_group(self.global_group,
str(self.auth_token))
if int(resp_new['status']) == 500:
self.fail('Identity fault')
elif int(resp_new['status']) == 503:
self.fail('Service Not Available')
if int(resp_new['status']) not in (200, 201):
self.fail('Failed due to %d' % int(resp_new['status']))
def test_global_group_create_xml(self):
utils.delete_global_group_xml(self.global_group, str(self.auth_token))
resp_new, _content_new = utils.create_global_group_xml(\
self.global_group,
str(self.auth_token))
if int(resp_new['status']) == 500:
self.fail('Identity fault')
elif int(resp_new['status']) == 503:
self.fail('Service Not Available')
if int(resp_new['status']) not in (200, 201):
self.fail('Failed due to %d' % int(resp_new['status']))
def test_global_group_create_again(self):
utils.create_global_group(self.global_group, str(self.auth_token))
resp_new, _content_new = utils.create_global_group(self.global_group,
str(self.auth_token))
if int(resp_new['status']) == 500:
self.fail('Identity fault')
elif int(resp_new['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(409, int(resp_new['status']))
def test_global_group_create_again_xml(self):
utils.create_global_group_xml(self.global_group, str(self.auth_token))
resp_new, content_new = utils.create_global_group_xml(\
self.global_group,
str(self.auth_token))
content_new = etree.fromstring(content_new)
if int(resp_new['status']) == 500:
self.fail('Identity fault')
elif int(resp_new['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(409, int(resp_new['status']))
def test_global_group_create_unauthorized_token(self):
_header = httplib2.Http(".cache")
resp, _content = utils.create_global_group(\
self.global_group,
str(self.token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(resp['status']))
def test_global_group_create_unauthorized_token_xml(self):
_header = httplib2.Http(".cache")
resp, _content = utils.create_global_group_xml(\
self.global_group,
str(self.token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(resp['status']))
def test_global_group_create_expired_token(self):
header = httplib2.Http(".cache")
url = '%sgroups' % (utils.URL_V2)
body = {"group": {"id": self.global_group,
"description": "A description ..."}}
resp, _content = header.request(url, "POST", body=json.dumps(body),
headers={"Content-Type": "application/json",
"X-Auth-Token": \
self.exp_auth_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
def test_global_group_create_expired_token_xml(self):
header = httplib2.Http(".cache")
url = '%sgroups' % (utils.URL_V2)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/identity/api/v2.0" \
id="%s"><description>A description...</description> \
</group>' % self.globaltenant
resp, _content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.exp_auth_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
def test_global_group_create_missing_token(self):
header = httplib2.Http(".cache")
url = '%sgroups' % (utils.URL_V2)
body = {"group": {"id": self.global_group,
"description": "A description ..."}}
resp, _content = header.request(url, "POST", body=json.dumps(body),
headers={"Content-Type": "application/json"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(resp['status']))
def test_global_group_create_missing_token_xml(self):
header = httplib2.Http(".cache")
url = '%sgroups' % (utils.URL_V2)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/identity/api/v2.0" \
id="%s"><description>A description...</description> \
</group>' % self.global_group
resp, _content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(resp['status']))
def test_global_group_create_disabled_token(self):
header = httplib2.Http(".cache")
url = '%sgroups' % (utils.URL_V2)
body = '{"group": { "id": "%s", \
"description": "A description ..." } }' % self.global_group
resp, _content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/json",
"X-Auth-Token": \
self.disabled_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
def test_global_group_create_disabled_token_xml(self):
header = httplib2.Http(".cache")
url = '%sgroups' % (utils.URL_V2)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/identity/api/v2.0" \
id="%s"><description>A description...</description> \
</group>' % self.global_group
resp, _content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.disabled_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
def test_global_group_create_invalid_token(self):
header = httplib2.Http(".cache")
url = '%sgroups' % (utils.URL_V2)
body = '{"group": { "id": "%s", \
"description": "A description ..." } }' % self.globaltenant
resp, _content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/json",
"X-Auth-Token": 'nonexsitingtoken'})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(404, int(resp['status']))
def test_global_group_create_invalid_token_xml(self):
header = httplib2.Http(".cache")
url = '%sgroups' % (utils.URL_V2)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/identity/api/v2.0" \
id="%s"><description>A description...</description> \
</group>' % self.global_group
resp, _content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"X-Auth-Token": 'nonexsitingtoken',
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(404, int(resp['status']))
class GetGlobalGroupsTest(GlobalGroupTest):
def test_get_global_groups(self):
header = httplib2.Http(".cache")
utils.delete_global_group(self.global_group, str(self.auth_token))
_resp_new, _content_new = utils.create_global_group(self.global_group,
str(self.auth_token))
url = '%sgroups' % (utils.URL_V2)
resp, _content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
def test_get_global_groups_xml(self):
header = httplib2.Http(".cache")
utils.create_global_group_xml(self.global_group, str(self.auth_token))
url = '%sgroups' % (utils.URL_V2)
resp, _content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
def test_get_global_groups_unauthorized_token(self):
header = httplib2.Http(".cache")
_resp_new, _content_new = utils.create_global_group(self.global_group,
str(self.auth_token))
url = '%sgroups' % (utils.URL_V2)
#test for Content-Type = application/json
resp, _content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(resp['status']))
def test_get_global_groups_unauthorized_token_xml(self):
header = httplib2.Http(".cache")
_resp_new, _content_new = utils.create_global_group_xml(\
self.global_group,
str(self.auth_token))
url = '%sgroups' % (utils.URL_V2)
#test for Content-Type = application/json
resp, _content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(resp['status']))
def test_get_global_groups_exp_token(self):
header = httplib2.Http(".cache")
_resp_new, _content_new = utils.create_global_group(self.global_group,
str(self.auth_token))
url = '%sgroups' % (utils.URL_V2)
#test for Content-Type = application/json
resp, _content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": \
self.exp_auth_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
def test_get_global_groups_exp_token_xml(self):
header = httplib2.Http(".cache")
_resp_new, _content_new = utils.create_global_group_xml(\
self.global_group,
str(self.auth_token))
url = '%sgroups' % (utils.URL_V2)
#test for Content-Type = application/json
resp, _content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.exp_auth_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
class GetGlobalGroupTest(GlobalGroupTest):
def test_get_global_group(self):
header = httplib2.Http(".cache")
_resp_new, _content_new = utils.create_global_group(self.global_group,
str(self.auth_token))
url = '%sgroups/%s' % (utils.URL_V2, self.global_group)
#test for Content-Type = application/json
resp, _content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
def test_get_global_group_xml(self):
header = httplib2.Http(".cache")
_resp_new, _content_new = utils.create_global_group_xml(\
self.global_group,
str(self.auth_token))
url = '%sgroups/%s' % (utils.URL_V2, self.global_group)
#test for Content-Type = application/json
resp, _content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
def test_get_global_group_bad(self):
header = httplib2.Http(".cache")
_resp_new, _content_new = utils.create_global_group(self.global_group,
str(self.auth_token))
url = '%sgroups/%s' % (utils.URL_V2, 'global_group_bad')
#test for Content-Type = application/json
resp, _content = header.request(url, "GET", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(404, int(resp['status']))
def test_get_global_group_bad_xml(self):
header = httplib2.Http(".cache")
_resp_new, _content_new = utils.create_global_group_xml(\
self.global_group,
str(self.auth_token))
url = '%sgroups/%s' % (utils.URL_V2, 'global_group_bad')
#test for Content-Type = application/json
resp, _content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(404, int(resp['status']))
class UpdateGlobalGroupsTest(GlobalGroupTest):
def test_update_global_group(self):
header = httplib2.Http(".cache")
_resp_new, _content_new = utils.create_global_group(self.global_group,
str(self.auth_token))
url = '%sgroups/%s' % (utils.URL_V2, self.global_group)
resp, content = header.request(url, "PUT", body='{"group":{\
"id" : "%s","description" :\
"A New description of the group..."}}' % self.global_group,
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
body = json.loads(content)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
self.assertEqual(self.global_group, body['group']['id'])
self.assertEqual('A New description of the group...',
str(body['group']['description']))
def test_update_global_group_xml(self):
header = httplib2.Http(".cache")
_resp_new, _content_new = utils.create_global_group(self.global_group,
str(self.auth_token))
url = '%sgroups/%s' % (utils.URL_V2, self.global_group)
data = u'<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/identity/api/v2.0" \
id="%s"><description>A NEW description...</description> \
</group>' % (self.global_group)
#test for Content-Type = application/json
resp, content = header.request(url, "PUT", body=data,
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
body = etree.fromstring(content)
desc = body.find("{http://docs.openstack.org/identity/api/v2.0}description")
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
self.assertEqual(str(self.global_group), str(body.get('id')))
self.assertEqual('A NEW description...', desc.text)
def test_update_global_group_bad(self):
header = httplib2.Http(".cache")
_resp_new, _content_new = utils.create_global_group(self.global_group,
str(self.auth_token))
url = '%sgroups/%s' % (utils.URL_V2, self.global_group)
data = '{"group": { "description_bad": "A NEW description...", \
"id":"%s" }}'\
% (self.global_group)
#test for Content-Type = application/json
resp, _content = header.request(url, "PUT", body=data,
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(400, int(resp['status']))
def test_update_global_group_bad_xml(self):
header = httplib2.Http(".cache")
utils.create_global_group_xml(self.global_group, str(self.auth_token))
url = '%sgroups/%s' % (utils.URL_V2, self.global_group)
data = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/identity/api/v2.0" \
id="%s"><description_bad>A NEW description...</description> \
</group>' % (self.global_group)
#test for Content-Type = application/json
resp, _content = header.request(url, "PUT", body=data,
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(400, int(resp['status']))
def test_update_global_group_not_found(self):
header = httplib2.Http(".cache")
utils.create_global_group(self.global_group, str(self.auth_token))
url = '%sgroups/NonexistingID' % (utils.URL_V2)
data = '{"group": { "description": "A NEW description...", \
"id":"NonexistingID"}}'
#test for Content-Type = application/json
resp, _content = header.request(url, "GET", body=data,
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
self.assertEqual(404, int(resp['status']))
def test_update_global_group_not_found_xml(self):
header = httplib2.Http(".cache")
utils.create_tenant_xml(self.globaltenant, str(self.auth_token))
url = '%sgroups/NonexistingID' % (utils.URL_V2)
data = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/identity/api/v2.0" \
id="NonexistingID"> \
<description_bad>A NEW description...</description> \
</group>'
#test for Content-Type = application/json
resp, _content = header.request(url, "GET", body=data,
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(404, int(resp['status']))
class DeleteGlobalGroupTest(GlobalGroupTest):
def test_delete_global_group_not_found(self):
resp, _content = utils.delete_global_group("test_global_group_1",
str(self.auth_token))
self.assertEqual(404, int(resp['status']))
def test_delete_global_group_not_found_xml(self):
resp, _content = utils.delete_global_group_xml("test_global_group_1",
str(self.auth_token))
self.assertEqual(404, int(resp['status']))
def test_delete_global_group(self):
utils.create_tenant(self.globaltenant, str(self.auth_token))
utils.create_tenant_group('test_global_group_delete',
self.globaltenant, str(self.auth_token))
resp_new, _content_new = utils.delete_global_group(\
'test_global_group_delete',
str(self.auth_token))
_resp = utils.delete_tenant(self.globaltenant, str(self.auth_token))
self.assertEqual(204, int(resp_new['status']))
def test_delete_global_group_xml(self):
utils.create_tenant_xml(self.globaltenant, str(self.auth_token))
utils.create_tenant_group_xml('test_global_group_delete',
self.globaltenant, str(self.auth_token))
resp_new, _content_new = utils.delete_global_group_xml(\
'test_global_group_delete',
str(self.auth_token))
utils.delete_tenant_xml(self.globaltenant, str(self.auth_token))
self.assertEqual(204, int(resp_new['status']))
class AddUserGlobalGroupTest(unittest.TestCase):
def setUp(self):
self.tenant = utils.get_global_tenant()
self.auth_token = utils.get_auth_token()
self.user = utils.get_user()
self.userdisabled = utils.get_userdisabled()
self.exp_auth_token = utils.get_exp_auth_token()
self.disabled_token = utils.get_disabled_token()
self.global_group = 'test_global_group'
utils.create_tenant(self.tenant, str(self.auth_token))
utils.create_user(self.tenant, self.user, self.auth_token)
utils.add_user_json(self.auth_token)
self.token = utils.get_token(self.user, 'secrete', self.tenant,
'token')
def tearDown(self):
utils.delete_user_global_group(self.global_group, self.user,
str(self.auth_token))
utils.delete_user(self.user, str(self.auth_token))
utils.delete_user(self.user, self.auth_token)
utils.delete_global_group(self.global_group, self.auth_token)
def test_add_user_global_group(self):
resp, _content = utils.create_global_group(self.global_group,
str(self.auth_token))
utils.create_user(self.tenant, self.user, str(self.auth_token))
resp_new, _content_new = utils.add_user_global_group(self.global_group,
self.user,
str(self.auth_token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
if int(resp_new['status']) not in (200, 201):
self.fail('Failed due to %d' % int(resp_new['status']))
def test_add_user_global_group_xml(self):
resp, _content = utils.create_global_group(self.global_group,
str(self.auth_token))
utils.create_user(self.tenant, self.user, str(self.auth_token))
resp_new, _content_new = utils.add_user_global_group_xml(\
self.global_group,
self.user,
str(self.auth_token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
if int(resp_new['status']) not in (200, 201):
self.fail('Failed due to %d' % int(resp_new['status']))
def test_add_user_global_group_conflict(self):
resp, _content = utils.create_global_group(self.global_group,
str(self.auth_token))
utils.create_user(self.tenant, self.user, str(self.auth_token))
utils.add_user_global_group(self.global_group, self.user,
str(self.auth_token))
resp_new, _content_new = utils.add_user_global_group(self.global_group,
self.user,
str(self.auth_token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(409, int(resp_new['status']))
def test_add_user_global_group_conflict_xml(self):
resp, _content = utils.create_global_group(self.global_group,
str(self.auth_token))
utils.create_user(self.tenant, self.user, str(self.auth_token))
utils.add_user_global_group_xml(self.global_group, self.user,
str(self.auth_token))
resp_new, _content_new = utils.add_user_global_group_xml(\
self.global_group,
self.user,
str(self.auth_token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(409, int(resp_new['status']))
def test_add_user_global_group_unauthorized(self):
resp, _content = utils.create_global_group(self.global_group,
str(self.auth_token))
utils.create_user(self.tenant, self.user, str(self.auth_token))
resp_new, _content_new = utils.add_user_global_group(self.global_group,
self.user,
str(self.token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(resp_new['status']))
def test_add_user_global_group_unauthorized_xml(self):
resp, _content = utils.create_global_group(self.global_group,
str(self.auth_token))
utils.create_user(self.tenant, self.user, str(self.auth_token))
resp_new, _content_new = utils.add_user_global_group_xml(\
self.global_group,
self.user,
str(self.token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(resp_new['status']))
def test_add_user_global_group_forbidden(self):
resp, _content = utils.create_global_group(self.global_group,
str(self.auth_token))
utils.create_user(self.tenant, self.user, str(self.auth_token))
resp_new, _content_new = utils.add_user_global_group(\
self.global_group,
self.user,
str(self.disabled_token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp_new['status']))
def test_add_user_global_group_forbidden_xml(self):
resp, _content = utils.create_global_group(self.global_group,
str(self.auth_token))
utils.create_user(self.tenant, self.user, str(self.auth_token))
resp_new, _content_new = utils.add_user_global_group_xml(\
self.global_group,
self.user,
str(self.disabled_token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp_new['status']))
class GetUsersTenantGroupTest(unittest.TestCase):
def setUp(self):
self.tenant = utils.get_global_tenant()
self.user = utils.get_user()
self.userdisabled = utils.get_userdisabled()
self.auth_token = utils.get_auth_token()
self.exp_auth_token = utils.get_exp_auth_token()
self.disabled_token = utils.get_disabled_token()
self.global_group = 'test_global_group'
utils.create_tenant(self.tenant, str(self.auth_token))
utils.create_user(self.tenant, self.user, self.auth_token)
utils.add_user_json(self.auth_token)
self.token = utils.get_token(self.user, 'secrete', self.tenant,
'token')
def tearDown(self):
utils.delete_user_global_group(self.global_group, self.user,
str(self.auth_token))
utils.delete_user(self.user, str(self.auth_token))
utils.delete_global_group(self.global_group, self.auth_token)
def test_get_users_global_group(self):
resp, _content = utils.create_global_group(self.global_group,
str(self.auth_token))
utils.create_user(self.tenant, self.user, str(self.auth_token))
utils.add_user_global_group(self.global_group, self.user,
str(self.auth_token))
resp_new, _content_new = utils.get_user_global_group(
self.global_group,
str(self.auth_token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp_new['status']))
def test_get_users_global_group_xml(self):
resp, _content = utils.create_global_group(self.global_group,
str(self.auth_token))
utils.create_user(self.tenant, self.user, str(self.auth_token))
utils.add_user_global_group_xml(self.global_group, self.user,
str(self.auth_token))
resp_new, _content_new = utils.get_user_global_group_xml(\
self.global_group,
str(self.auth_token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp_new['status']))
def test_get_users_global_group_unauthorized(self):
resp, _content = utils.create_global_group(self.global_group,
str(self.auth_token))
utils.create_user(self.tenant, self.user, str(self.auth_token))
utils.add_user_global_group(self.global_group, self.user,
str(self.auth_token))
resp_new, _content_new = utils.get_user_global_group(\
self.global_group,
str(self.token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(resp_new['status']))
def test_get_users_global_group_unauthorized_xml(self):
resp, _content = utils.create_global_group(self.global_group,
str(self.auth_token))
utils.create_user(self.tenant, self.user, str(self.auth_token))
utils.add_user_global_group(self.global_group, self.user,
str(self.auth_token))
resp_new, _content_new = utils.get_user_global_group_xml(\
self.global_group,
str(self.token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(resp_new['status']))
def test_get_users_global_group_forbidden(self):
resp, _content = utils.create_global_group(self.global_group,
str(self.auth_token))
utils.create_user(self.tenant, self.user, str(self.auth_token))
utils.add_user_global_group(self.global_group, self.user,
str(self.auth_token))
resp_new, _content_new = utils.get_user_global_group(\
self.global_group,
str(self.disabled_token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp_new['status']))
def test_get_users_global_group_forbidden_xml(self):
resp, _content = utils.create_global_group(self.global_group,
str(self.auth_token))
utils.create_user(self.tenant, self.user, str(self.auth_token))
utils.add_user_global_group(self.global_group, self.user,
str(self.auth_token))
resp_new, _content_new = utils.get_user_global_group_xml(\
self.global_group,
str(self.disabled_token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp_new['status']))
def test_get_users_global_group_expired(self):
resp, _content = utils.create_global_group(self.global_group,
str(self.auth_token))
utils.create_user(self.tenant, self.user, str(self.auth_token))
utils.add_user_global_group(self.global_group, self.user,
str(self.auth_token))
resp_new, _content_new = utils.get_user_global_group(\
self.global_group,
str(self.exp_auth_token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp_new['status']))
def test_get_users_global_group_expired_xml(self):
resp, _content = utils.create_global_group(self.global_group,
str(self.auth_token))
utils.create_user(self.tenant, self.user, str(self.auth_token))
utils.add_user_global_group(self.global_group, self.user,
str(self.auth_token))
resp_new, _content_new = utils.get_user_global_group_xml(\
self.global_group,
str(self.exp_auth_token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp_new['status']))
class DeleteUsersGlobalGroupTest(unittest.TestCase):
def setUp(self):
self.tenant = utils.get_global_tenant()
self.user = utils.get_user()
self.userdisabled = utils.get_userdisabled()
self.auth_token = utils.get_auth_token()
self.exp_auth_token = utils.get_exp_auth_token()
self.disabled_token = utils.get_disabled_token()
self.global_group = 'test_global_group'
utils.create_tenant(self.tenant, str(self.auth_token))
utils.create_user(self.tenant, self.user, self.auth_token)
utils.add_user_json(self.auth_token)
self.token = utils.get_token(self.user, 'secrete', self.tenant,
'token')
def tearDown(self):
utils.delete_user_global_group(self.global_group, self.user,
str(self.auth_token))
utils.delete_user(self.user, str(self.auth_token))
utils.delete_global_group(self.global_group, self.auth_token)
def test_delete_user_global_group(self):
resp, _content = utils.create_global_group(self.global_group,
str(self.auth_token))
utils.create_user(self.tenant, self.user, str(self.auth_token))
utils.add_user_global_group(self.global_group, self.user,
str(self.auth_token))
resp_new, _content_new = utils.delete_user_global_group(\
self.global_group,
self.user,
str(self.auth_token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(204, int(resp_new['status']))
def test_delete_user_global_group_xml(self):
resp, _content = utils.create_global_group(self.global_group,
str(self.auth_token))
utils.create_user(self.tenant, self.user, str(self.auth_token))
utils.add_user_global_group(self.global_group, self.user,
str(self.auth_token))
resp_new, _content_new = utils.delete_user_global_group_xml(\
self.global_group,
self.user,
str(self.auth_token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(204, int(resp_new['status']))
def test_delete_user_global_group_notfound(self):
resp, _content = utils.create_global_group(self.global_group,
str(self.auth_token))
utils.create_user(self.tenant, self.user, str(self.auth_token))
utils.add_user_global_group(self.global_group, self.user,
str(self.disabled_token))
utils.delete_user_global_group(self.global_group, self.user,
str(self.auth_token))
resp_new, _content_new = utils.delete_user_global_group(
self.global_group,
self.user,
str(self.auth_token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(404, int(resp_new['status']))
def test_delete_user_global_group_notfound_xml(self):
resp, _content = utils.create_global_group(self.global_group,
str(self.auth_token))
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
utils.create_user(self.tenant, self.user, str(self.auth_token))
utils.add_user_global_group(self.global_group, self.user,
str(self.disabled_token))
utils.delete_user_global_group(self.global_group, self.user,
str(self.auth_token))
resp_new, _content_new = utils.delete_user_global_group_xml(\
self.global_group, self.user,
str(self.auth_token))
self.assertEqual(404, int(resp_new['status']))
if __name__ == '__main__':
unittest.main()

View File

@ -10,11 +10,9 @@ TEST_FILES = [
'test_common.py', # this doesn't actually contain tests
'test_endpoints.py',
#'test_urlrewritefilter.py',
'test_groups.py',
'test_keystone.py', # not sure why this is referencing itself
'test_roles.py',
#'test_server.py', # this is largely failing
'test_tenant_groups.py',
'test_tenants.py',
'test_token.py',
'test_users.py',

View File

@ -25,7 +25,7 @@ class TestServer(unittest.TestCase):
environ = {'wsgi.url_scheme': 'http'}
self.request = Request(environ)
self.auth_data = auth.AuthData(auth.Token(date.today(), "2231312"),
auth.User("username", "12345", auth.Groups([], [])))
auth.User("username", "12345"))
#def tearDown(self):

File diff suppressed because it is too large Load Diff

View File

@ -726,113 +726,6 @@ class GetUsersTest(UserTest):
# self.assertEqual('application/xml', utils.content_type(resp))
class GetUsersGroupTest(UserTest):
def test_users_group_get(self):
resp, _content = utils.users_group_get_json(self.user,
self.auth_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(200, resp_val)
def test_users_group_get_xml(self):
resp, _content = utils.users_group_get_xml(self.user,
self.auth_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(200, resp_val)
self.assertEqual('application/xml', utils.content_type(resp))
def test_users_group_get_expired_token(self):
resp, _content = utils.users_group_get_json(self.user,
self.exp_auth_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(403, resp_val)
def test_users_group_get_expired_token_xml(self):
resp, _content = utils.users_group_get_xml(self.user,
self.exp_auth_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(403, resp_val)
self.assertEqual('application/xml', utils.content_type(resp))
def test_users_group_get_disabled_token(self):
resp, _content = utils.users_group_get_json(self.user,
self.disabled_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(403, resp_val)
def test_users_group_get_disabled_token_xml(self):
resp, _content = utils.users_group_get_xml(self.user,
self.disabled_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(403, resp_val)
self.assertEqual('application/xml', utils.content_type(resp))
def test_users_group_get_missing_token(self):
resp, _content = utils.users_group_get_json(self.user,
self.missing_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(401, resp_val)
def test_users_group_get_missing_token_xml(self):
resp, _content = utils.users_group_get_xml(self.user,
self.missing_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(401, resp_val)
self.assertEqual('application/xml', utils.content_type(resp))
def test_users_group_get_invalid_token(self):
resp, _content = utils.users_group_get_json(self.user,
self.invalid_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(404, resp_val)
def test_users_group_get_invalid_token_xml(self):
resp, _content = utils.users_group_get_xml(self.user,
self.invalid_token)
resp_val = int(resp['status'])
if resp_val == 500:
self.fail('Identity Fault')
elif resp_val == 503:
self.fail('Service Not Available')
self.assertEqual(404, resp_val)
self.assertEqual('application/xml', utils.content_type(resp))
class UpdateUserTest(UserTest):
def setUp(self):