v3 Identity

- v3 identity tests (bug 1023930)
- v3 identity implementation (bug 1023937)

Change-Id: Ic46575afe9760d9da85e262d0cf063ea002d9dcd
This commit is contained in:
Dolph Mathews 2012-11-09 08:32:18 -06:00 committed by Gerrit Code Review
parent ff669f0da9
commit ddc8c83368
6 changed files with 891 additions and 124 deletions

View File

@ -37,15 +37,20 @@ def check_type(property_name, value, expected_type, display_expected_type):
raise exception.ValidationError(msg) raise exception.ValidationError(msg)
def tenant_name(name): def check_name(property_name, name):
check_type("Tenant name", name, basestring, "string or unicode") check_type('%s name' % property_name, name, basestring, 'str or unicode')
name = name.strip() name = name.strip()
check_length("Tenant name", name) check_length('%s name' % property_name, name)
return name return name
def domain_name(name):
return check_name('Domain', name)
def tenant_name(name):
return check_name('Tenant', name)
def user_name(name): def user_name(name):
check_type("User name", name, basestring, "string or unicode") return check_name('User', name)
name = name.strip()
check_length("User name", name)
return name

View File

@ -0,0 +1,79 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import migrate
import sqlalchemy as sql
def upgrade(migrate_engine):
# Upgrade operations go here. Don't create your own engine; bind
# migrate_engine to your metadata
meta = sql.MetaData()
meta.bind = migrate_engine
domain_table = sql.Table(
'domain',
meta,
sql.Column('id', sql.String(64), primary_key=True),
sql.Column('name', sql.String(64), unique=True, nullable=False),
sql.Column('extra', sql.Text()))
domain_table.create(migrate_engine, checkfirst=True)
sql.Table('user', meta, autoload=True)
user_domain_metadata_table = sql.Table(
'user_domain_metadata',
meta,
sql.Column(
'user_id',
sql.String(64),
sql.ForeignKey('user.id'),
primary_key=True),
sql.Column(
'domain_id',
sql.String(64),
sql.ForeignKey('domain.id'),
primary_key=True),
sql.Column('data', sql.Text()))
user_domain_metadata_table.create(migrate_engine, checkfirst=True)
sql.Table('tenant', meta, autoload=True)
credential_table = sql.Table(
'credential',
meta,
sql.Column('id', sql.String(64), primary_key=True),
sql.Column('user_id',
sql.String(64),
sql.ForeignKey('user.id'),
nullable=False),
sql.Column('project_id',
sql.String(64),
sql.ForeignKey('tenant.id')),
sql.Column('blob', sql.Text(), nullable=False),
sql.Column('type', sql.String(255), nullable=False),
sql.Column('extra', sql.Text()))
credential_table.create(migrate_engine, checkfirst=True)
role = sql.Table('role', meta, autoload=True)
extra = sql.Column('extra', sql.Text())
role.create_column(extra)
def downgrade(migrate_engine):
meta = sql.MetaData()
meta.bind = migrate_engine
role = sql.Table('role', meta, autoload=True)
role.drop_column('extra')

View File

@ -95,10 +95,18 @@ class ServiceNotFound(NotFound):
"""Could not find service: %(service_id)s""" """Could not find service: %(service_id)s"""
class DomainNotFound(NotFound):
"""Could not find domain: %(domain_id)s"""
class TenantNotFound(NotFound): class TenantNotFound(NotFound):
"""Could not find tenant: %(tenant_id)s""" """Could not find tenant: %(tenant_id)s"""
class ProjectNotFound(TenantNotFound):
"""Could not find project: %(project_id)s"""
class TokenNotFound(NotFound): class TokenNotFound(NotFound):
"""Could not find token: %(token_id)s""" """Could not find token: %(token_id)s"""

View File

@ -357,3 +357,28 @@ class Identity(kvs.Base, identity.Driver):
role_list = set(self.db.get('role_list', [])) role_list = set(self.db.get('role_list', []))
role_list.remove(role_id) role_list.remove(role_id)
self.db.set('role_list', list(role_list)) self.db.set('role_list', list(role_list))
# domain crud
def create_domain(self, domain_id, domain):
self.db.set('domain-%s' % domain_id, domain)
domain_list = set(self.db.get('domain_list', []))
domain_list.add(domain_id)
self.db.set('domain_list', list(domain_list))
return domain
def list_domains(self):
return self.db.get('domain_list', [])
def get_domain(self, domain_id):
return self.db.get('domain-%s' % domain_id)
def update_domain(self, domain_id, domain):
self.db.set('domain-%s' % domain_id, domain)
return domain
def delete_domain(self, domain_id):
self.db.delete('domain-%s' % domain_id)
domain_list = set(self.db.get('domain_list', []))
domain_list.remove(domain_id)
self.db.set('domain_list', list(domain_list))

View File

@ -45,7 +45,30 @@ class User(sql.ModelBase, sql.DictBase):
extra = sql.Column(sql.JsonBlob()) extra = sql.Column(sql.JsonBlob())
class Credential(sql.ModelBase, sql.DictBase):
__tablename__ = 'credential'
attributes = ['id', 'user_id', 'project_id', 'blob', 'type']
id = sql.Column(sql.String(64), primary_key=True)
user_id = sql.Column(sql.String(64),
sql.ForeignKey('user.id'),
nullable=False)
project_id = sql.Column(sql.String(64), sql.ForeignKey('tenant.id'))
blob = sql.Column(sql.JsonBlob(), nullable=False)
type = sql.Column(sql.String(255), nullable=False)
extra = sql.Column(sql.JsonBlob())
class Domain(sql.ModelBase, sql.DictBase):
__tablename__ = 'domain'
attributes = ['id', 'name']
id = sql.Column(sql.String(64), primary_key=True)
name = sql.Column(sql.String(64), unique=True, nullable=False)
extra = sql.Column(sql.JsonBlob())
# TODO(dolph): rename to Project
class Tenant(sql.ModelBase, sql.DictBase): class Tenant(sql.ModelBase, sql.DictBase):
# TODO(dolph): rename to project
__tablename__ = 'tenant' __tablename__ = 'tenant'
attributes = ['id', 'name'] attributes = ['id', 'name']
id = sql.Column(sql.String(64), primary_key=True) id = sql.Column(sql.String(64), primary_key=True)
@ -58,11 +81,14 @@ class Role(sql.ModelBase, sql.DictBase):
attributes = ['id', 'name'] attributes = ['id', 'name']
id = sql.Column(sql.String(64), primary_key=True) id = sql.Column(sql.String(64), primary_key=True)
name = sql.Column(sql.String(64), unique=True, nullable=False) name = sql.Column(sql.String(64), unique=True, nullable=False)
extra = sql.Column(sql.JsonBlob())
class Metadata(sql.ModelBase, sql.DictBase): class UserProjectMetadata(sql.ModelBase, sql.DictBase):
# TODO(dolph): rename to user_project_metadata (needs a migration)
__tablename__ = 'metadata' __tablename__ = 'metadata'
user_id = sql.Column(sql.String(64), primary_key=True) user_id = sql.Column(sql.String(64), primary_key=True)
# TODO(dolph): rename to project_id (needs a migration)
tenant_id = sql.Column(sql.String(64), primary_key=True) tenant_id = sql.Column(sql.String(64), primary_key=True)
data = sql.Column(sql.JsonBlob()) data = sql.Column(sql.JsonBlob())
@ -75,6 +101,14 @@ class Metadata(sql.ModelBase, sql.DictBase):
return dict(self.iteritems()) return dict(self.iteritems())
class UserDomainMetadata(sql.ModelBase, sql.DictBase):
__tablename__ = 'user_domain_metadata'
user_id = sql.Column(sql.String(64), primary_key=True)
domain_id = sql.Column(sql.String(64), primary_key=True)
data = sql.Column(sql.JsonBlob())
# TODO(dolph): ... do we need this table?
class UserTenantMembership(sql.ModelBase, sql.DictBase): class UserTenantMembership(sql.ModelBase, sql.DictBase):
"""Tenant membership join table.""" """Tenant membership join table."""
__tablename__ = 'user_tenant_membership' __tablename__ = 'user_tenant_membership'
@ -164,52 +198,79 @@ class Identity(sql.Base, identity.Driver):
return [identity.filter_user(user_ref.to_dict()) return [identity.filter_user(user_ref.to_dict())
for user_ref in user_refs] for user_ref in user_refs]
def _get_user(self, user_id): def get_metadata(self, user_id, tenant_id=None, domain_id=None):
session = self.get_session() session = self.get_session()
user_ref = session.query(User).filter_by(id=user_id).first()
if not user_ref:
raise exception.UserNotFound(user_id=user_id)
return user_ref.to_dict()
def _get_user_by_name(self, user_name): if tenant_id:
session = self.get_session() q = session.query(UserProjectMetadata)
user_ref = session.query(User).filter_by(name=user_name).first() q = q.filter_by(tenant_id=tenant_id)
if not user_ref: elif domain_id:
raise exception.UserNotFound(user_id=user_name) q = session.query(UserDomainMetadata)
return user_ref.to_dict() q = q.filter_by(domain_id=domain_id)
q = q.filter_by(user_id=user_id)
def get_user(self, user_id): try:
return identity.filter_user(self._get_user(user_id)) return q.one().data
except sql.NotFound:
def get_user_by_name(self, user_name):
return identity.filter_user(self._get_user_by_name(user_name))
def get_metadata(self, user_id, tenant_id):
session = self.get_session()
query = session.query(Metadata)
query = query.filter_by(user_id=user_id)
query = query.filter_by(tenant_id=tenant_id)
metadata_ref = query.first()
if metadata_ref is None:
raise exception.MetadataNotFound() raise exception.MetadataNotFound()
return metadata_ref.data
def get_role(self, role_id): def create_grant(self, role_id, user_id, domain_id, project_id):
session = self.get_session() self.get_role(role_id)
role_ref = session.query(Role).filter_by(id=role_id).first() self.get_user(user_id)
if role_ref is None: if domain_id:
self.get_domain(domain_id)
if project_id:
self.get_tenant(project_id)
try:
metadata_ref = self.get_metadata(user_id, project_id, domain_id)
is_new = False
except exception.MetadataNotFound:
metadata_ref = {}
is_new = True
roles = set(metadata_ref.get('roles', []))
roles.add(role_id)
metadata_ref['roles'] = list(roles)
if is_new:
self.create_metadata(user_id, project_id, metadata_ref, domain_id)
else:
self.update_metadata(user_id, project_id, metadata_ref, domain_id)
def list_grants(self, user_id, domain_id, project_id):
metadata_ref = self.get_metadata(user_id, project_id, domain_id)
return [self.get_role(x) for x in metadata_ref.get('roles', [])]
def get_grant(self, role_id, user_id, domain_id, project_id):
metadata_ref = self.get_metadata(user_id, project_id, domain_id)
role_ids = set(metadata_ref.get('roles', []))
if role_id not in role_ids:
raise exception.RoleNotFound(role_id=role_id) raise exception.RoleNotFound(role_id=role_id)
return role_ref return self.get_role(role_id)
def list_users(self): def delete_grant(self, role_id, user_id, domain_id, project_id):
session = self.get_session() self.get_role(role_id)
user_refs = session.query(User) self.get_user(user_id)
return [identity.filter_user(x.to_dict()) for x in user_refs] if domain_id:
self.get_domain(domain_id)
if project_id:
self.get_tenant(project_id)
def list_roles(self): try:
session = self.get_session() metadata_ref = self.get_metadata(user_id, project_id, domain_id)
role_refs = session.query(Role) is_new = False
return list(role_refs) except exception.MetadataNotFound:
metadata_ref = {}
is_new = True
roles = set(metadata_ref.get('roles', []))
try:
roles.remove(role_id)
except KeyError:
raise exception.RoleNotFound(role_id=role_id)
metadata_ref['roles'] = list(roles)
if is_new:
self.create_metadata(user_id, project_id, metadata_ref, domain_id)
else:
self.update_metadata(user_id, project_id, metadata_ref, domain_id)
# These should probably be part of the high-level API # These should probably be part of the high-level API
def add_user_to_tenant(self, tenant_id, user_id): def add_user_to_tenant(self, tenant_id, user_id):
@ -306,6 +367,198 @@ class Identity(sql.Base, identity.Driver):
self.update_metadata(user_id, tenant_id, metadata_ref) self.update_metadata(user_id, tenant_id, metadata_ref)
# CRUD # CRUD
@handle_conflicts(type='tenant')
def create_tenant(self, tenant_id, tenant):
tenant['name'] = clean.tenant_name(tenant['name'])
session = self.get_session()
with session.begin():
tenant_ref = Tenant.from_dict(tenant)
session.add(tenant_ref)
session.flush()
return tenant_ref.to_dict()
@handle_conflicts(type='tenant')
def update_tenant(self, tenant_id, tenant):
session = self.get_session()
if 'name' in tenant:
tenant['name'] = clean.tenant_name(tenant['name'])
try:
tenant_ref = session.query(Tenant).filter_by(id=tenant_id).one()
except sql.NotFound:
raise exception.TenantNotFound(tenant_id=tenant_id)
with session.begin():
old_tenant_dict = tenant_ref.to_dict()
for k in tenant:
old_tenant_dict[k] = tenant[k]
new_tenant = Tenant.from_dict(old_tenant_dict)
tenant_ref.name = new_tenant.name
tenant_ref.extra = new_tenant.extra
session.flush()
return tenant_ref.to_dict(include_extra_dict=True)
def delete_tenant(self, tenant_id):
session = self.get_session()
try:
tenant_ref = session.query(Tenant).filter_by(id=tenant_id).one()
except sql.NotFound:
raise exception.TenantNotFound(tenant_id=tenant_id)
with session.begin():
q = session.query(UserTenantMembership)
q = q.filter_by(tenant_id=tenant_id)
q.delete(False)
q = session.query(UserProjectMetadata)
q = q.filter_by(tenant_id=tenant_id)
q.delete(False)
if not session.query(Tenant).filter_by(id=tenant_id).delete(False):
raise exception.TenantNotFound(tenant_id=tenant_id)
session.delete(tenant_ref)
session.flush()
@handle_conflicts(type='metadata')
def create_metadata(self, user_id, tenant_id, metadata, domain_id=None):
session = self.get_session()
with session.begin():
if tenant_id:
session.add(UserProjectMetadata(user_id=user_id,
tenant_id=tenant_id,
data=metadata))
elif domain_id:
session.add(UserDomainMetadata(user_id=user_id,
domain_id=domain_id,
data=metadata))
session.flush()
return metadata
@handle_conflicts(type='metadata')
def update_metadata(self, user_id, tenant_id, metadata, domain_id=None):
session = self.get_session()
with session.begin():
if tenant_id:
metadata_ref = session.query(UserProjectMetadata)\
.filter_by(user_id=user_id)\
.filter_by(tenant_id=tenant_id)\
.first()
elif domain_id:
metadata_ref = session.query(UserDomainMetadata)\
.filter_by(user_id=user_id)\
.filter_by(domain_id=domain_id)\
.first()
data = metadata_ref.data.copy()
data.update(metadata)
metadata_ref.data = data
session.flush()
return metadata_ref
# domain crud
@handle_conflicts(type='domain')
def create_domain(self, domain_id, domain):
session = self.get_session()
with session.begin():
ref = Domain.from_dict(domain)
session.add(ref)
session.flush()
return ref.to_dict()
def list_domains(self):
session = self.get_session()
refs = session.query(Domain).all()
return [ref.to_dict() for ref in refs]
def get_domain(self, domain_id):
session = self.get_session()
ref = session.query(Domain).filter_by(id=domain_id).first()
if ref is None:
raise exception.DomainNotFound(domain_id=domain_id)
return ref.to_dict()
@handle_conflicts(type='domain')
def update_domain(self, domain_id, domain):
session = self.get_session()
with session.begin():
ref = session.query(Domain).filter_by(id=domain_id).first()
if ref is None:
raise exception.DomainNotFound(domain_id=domain_id)
old_dict = ref.to_dict()
for k in domain:
old_dict[k] = domain[k]
new_domain = Domain.from_dict(old_dict)
for attr in Domain.attributes:
if attr != 'id':
setattr(ref, attr, getattr(new_domain, attr))
ref.extra = new_domain.extra
session.flush()
return ref.to_dict()
def delete_domain(self, domain_id):
session = self.get_session()
ref = session.query(Domain).filter_by(id=domain_id).first()
if not ref:
raise exception.DomainNotFound(domain_id=domain_id)
with session.begin():
session.delete(ref)
session.flush()
# project crud
@handle_conflicts(type='project')
def create_project(self, project_id, project):
return self.create_tenant(project_id, project)
def get_project(self, project_id):
return self.get_tenant(project_id)
def list_projects(self):
return self.get_tenants()
@handle_conflicts(type='project')
def update_project(self, project_id, project):
session = self.get_session()
with session.begin():
ref = session.query(Tenant).filter_by(id=project_id).first()
if ref is None:
raise exception.TenantNotFound(project_id=project_id)
old_dict = ref.to_dict()
for k in project:
old_dict[k] = project[k]
new_project = Tenant.from_dict(old_dict)
for attr in Tenant.attributes:
if attr != 'id':
setattr(ref, attr, getattr(new_project, attr))
ref.extra = new_project.extra
session.flush()
return ref.to_dict()
def delete_project(self, project_id):
return self.delete_tenant(project_id)
def list_user_projects(self, user_id):
session = self.get_session()
user = self.get_user(user_id)
metadata_refs = session\
.query(UserProjectMetadata)\
.filter_by(user_id=user_id)
project_ids = set([x.tenant_id for x in metadata_refs
if x.data.get('roles')])
if user.get('project_id'):
project_ids.add(user['project_id'])
# FIXME(dolph): this should be removed with proper migrations
if user.get('tenant_id'):
project_ids.add(user['tenant_id'])
return [self.get_project(x) for x in project_ids]
# user crud
@handle_conflicts(type='user') @handle_conflicts(type='user')
def create_user(self, user_id, user): def create_user(self, user_id, user):
user['name'] = clean.user_name(user['name']) user['name'] = clean.user_name(user['name'])
@ -317,6 +570,31 @@ class Identity(sql.Base, identity.Driver):
session.flush() session.flush()
return identity.filter_user(user_ref.to_dict()) return identity.filter_user(user_ref.to_dict())
def list_users(self):
session = self.get_session()
user_refs = session.query(User)
return [identity.filter_user(x.to_dict()) for x in user_refs]
def _get_user(self, user_id):
session = self.get_session()
user_ref = session.query(User).filter_by(id=user_id).first()
if not user_ref:
raise exception.UserNotFound(user_id=user_id)
return user_ref.to_dict()
def _get_user_by_name(self, user_name):
session = self.get_session()
user_ref = session.query(User).filter_by(name=user_name).first()
if not user_ref:
raise exception.UserNotFound(user_id=user_name)
return user_ref.to_dict()
def get_user(self, user_id):
return identity.filter_user(self._get_user(user_id))
def get_user_by_name(self, user_name):
return identity.filter_user(self._get_user_by_name(user_name))
@handle_conflicts(type='user') @handle_conflicts(type='user')
def update_user(self, user_id, user): def update_user(self, user_id, user):
if 'name' in user: if 'name' in user:
@ -333,128 +611,151 @@ class Identity(sql.Base, identity.Driver):
for k in user: for k in user:
old_user_dict[k] = user[k] old_user_dict[k] = user[k]
new_user = User.from_dict(old_user_dict) new_user = User.from_dict(old_user_dict)
for attr in User.attributes:
user_ref.name = new_user.name if attr != 'id':
setattr(user_ref, attr, getattr(new_user, attr))
user_ref.extra = new_user.extra user_ref.extra = new_user.extra
session.flush() session.flush()
return identity.filter_user(user_ref.to_dict(include_extra_dict=True)) return identity.filter_user(user_ref.to_dict(include_extra_dict=True))
def delete_user(self, user_id): def delete_user(self, user_id):
session = self.get_session() session = self.get_session()
try:
ref = session.query(User).filter_by(id=user_id).one()
except sql.NotFound:
raise exception.UserNotFound(user_id=user_id)
with session.begin(): with session.begin():
query = session.query(UserTenantMembership) q = session.query(UserTenantMembership)
query = query.filter_by(user_id=user_id) q = q.filter_by(user_id=user_id)
query.delete(False) q.delete(False)
query = session.query(Metadata)
query = query.filter_by(user_id=user_id) q = session.query(UserProjectMetadata)
query.delete(False) q = q.filter_by(user_id=user_id)
q.delete(False)
if not session.query(User).filter_by(id=user_id).delete(False): if not session.query(User).filter_by(id=user_id).delete(False):
raise exception.UserNotFound(user_id=user_id) raise exception.UserNotFound(user_id=user_id)
@handle_conflicts(type='tenant') session.delete(ref)
def create_tenant(self, tenant_id, tenant):
tenant['name'] = clean.tenant_name(tenant['name'])
session = self.get_session()
with session.begin():
tenant_ref = Tenant.from_dict(tenant)
session.add(tenant_ref)
session.flush() session.flush()
return tenant_ref.to_dict()
@handle_conflicts(type='tenant') # credential crud
def update_tenant(self, tenant_id, tenant):
if 'name' in tenant: @handle_conflicts(type='credential')
tenant['name'] = clean.tenant_name(tenant['name']) def create_credential(self, credential_id, credential):
session = self.get_session() session = self.get_session()
with session.begin(): with session.begin():
tenant_ref = session.query(Tenant).filter_by(id=tenant_id).first() ref = Credential.from_dict(credential)
if tenant_ref is None: session.add(ref)
raise exception.TenantNotFound(tenant_id=tenant_id)
old_tenant_dict = tenant_ref.to_dict()
for k in tenant:
old_tenant_dict[k] = tenant[k]
new_tenant = Tenant.from_dict(old_tenant_dict)
tenant_ref.name = new_tenant.name
tenant_ref.extra = new_tenant.extra
session.flush() session.flush()
return tenant_ref.to_dict(include_extra_dict=True) return ref.to_dict()
def delete_tenant(self, tenant_id): def list_credentials(self):
session = self.get_session()
refs = session.query(Credential).all()
return [ref.to_dict() for ref in refs]
def get_credential(self, credential_id):
session = self.get_session()
ref = session.query(Credential).filter_by(id=credential_id).first()
if ref is None:
raise exception.CredentialNotFound(credential_id=credential_id)
return ref.to_dict()
@handle_conflicts(type='credential')
def update_credential(self, credential_id, credential):
session = self.get_session() session = self.get_session()
with session.begin(): with session.begin():
query = session.query(UserTenantMembership) ref = session.query(Credential).filter_by(id=credential_id).first()
query = query.filter_by(tenant_id=tenant_id) if ref is None:
query.delete(False) raise exception.CredentialNotFound(credential_id=credential_id)
query = session.query(Metadata) old_dict = ref.to_dict()
query = query.filter_by(tenant_id=tenant_id) for k in credential:
query.delete(False) old_dict[k] = credential[k]
if not session.query(Tenant).filter_by(id=tenant_id).delete(False): new_credential = Credential.from_dict(old_dict)
raise exception.TenantNotFound(tenant_id=tenant_id) for attr in Credential.attributes:
if attr != 'id':
@handle_conflicts(type='metadata') setattr(ref, attr, getattr(new_credential, attr))
def create_metadata(self, user_id, tenant_id, metadata): ref.extra = new_credential.extra
session = self.get_session()
with session.begin():
session.add(Metadata(user_id=user_id,
tenant_id=tenant_id,
data=metadata))
session.flush() session.flush()
return metadata return ref.to_dict()
@handle_conflicts(type='metadata') def delete_credential(self, credential_id):
def update_metadata(self, user_id, tenant_id, metadata):
session = self.get_session() session = self.get_session()
with session.begin():
query = session.query(Metadata)
query = query.filter_by(user_id=user_id)
query = query.filter_by(tenant_id=tenant_id)
metadata_ref = query.first()
data = metadata_ref.data.copy()
for k in metadata:
data[k] = metadata[k]
metadata_ref.data = data
session.flush()
return metadata_ref
def delete_metadata(self, user_id, tenant_id): try:
self.db.delete('metadata-%s-%s' % (tenant_id, user_id)) ref = session.query(Credential).filter_by(id=credential_id).one()
return None except sql.NotFound:
raise exception.CredentialNotFound(credential_id=credential_id)
with session.begin():
session.delete(ref)
session.flush()
# role crud
@handle_conflicts(type='role') @handle_conflicts(type='role')
def create_role(self, role_id, role): def create_role(self, role_id, role):
session = self.get_session() session = self.get_session()
with session.begin(): with session.begin():
session.add(Role(**role)) ref = Role.from_dict(role)
session.add(ref)
session.flush() session.flush()
return role return ref.to_dict()
def list_roles(self):
session = self.get_session()
refs = session.query(Role).all()
return [ref.to_dict() for ref in refs]
def get_role(self, role_id):
session = self.get_session()
ref = session.query(Role).filter_by(id=role_id).first()
if ref is None:
raise exception.RoleNotFound(role_id=role_id)
return ref.to_dict()
@handle_conflicts(type='role') @handle_conflicts(type='role')
def update_role(self, role_id, role): def update_role(self, role_id, role):
session = self.get_session() session = self.get_session()
with session.begin(): with session.begin():
role_ref = session.query(Role).filter_by(id=role_id).first() ref = session.query(Role).filter_by(id=role_id).first()
if role_ref is None: if ref is None:
raise exception.RoleNotFound(role_id=role_id) raise exception.RoleNotFound(role_id=role_id)
old_dict = ref.to_dict()
for k in role: for k in role:
role_ref[k] = role[k] old_dict[k] = role[k]
new_role = Role.from_dict(old_dict)
for attr in Role.attributes:
if attr != 'id':
setattr(ref, attr, getattr(new_role, attr))
ref.extra = new_role.extra
session.flush() session.flush()
return role_ref return ref.to_dict()
def delete_role(self, role_id): def delete_role(self, role_id):
session = self.get_session() session = self.get_session()
try:
ref = session.query(Role).filter_by(id=role_id).one()
except sql.NotFound:
raise exception.RoleNotFound(role_id=role_id)
with session.begin(): with session.begin():
metadata_refs = session.query(Metadata) for metadata_ref in session.query(UserProjectMetadata):
for metadata_ref in metadata_refs:
metadata = metadata_ref.to_dict() metadata = metadata_ref.to_dict()
user_id = metadata['user_id']
tenant_id = metadata['tenant_id']
try: try:
self.remove_role_from_user_and_tenant(user_id, self.remove_role_from_user_and_tenant(
tenant_id, metadata['user_id'], metadata['tenant_id'], role_id)
role_id)
except exception.RoleNotFound: except exception.RoleNotFound:
pass pass
# FIXME(dolph): user-domain metadata needs to be updated
if not session.query(Role).filter_by(id=role_id).delete(): if not session.query(Role).filter_by(id=role_id).delete():
raise exception.RoleNotFound(role_id=role_id) raise exception.RoleNotFound(role_id=role_id)
session.delete(ref)
session.flush() session.flush()

349
tests/test_v3_identity.py Normal file
View File

@ -0,0 +1,349 @@
import uuid
import test_v3
class IdentityTestCase(test_v3.RestfulTestCase):
"""Test domains, projects, users, credential & role CRUD"""
def setUp(self):
super(IdentityTestCase, self).setUp()
self.domain_id = uuid.uuid4().hex
self.domain = self.new_domain_ref()
self.domain['id'] = self.domain_id
self.identity_api.create_domain(
self.domain_id,
self.domain.copy())
self.project_id = uuid.uuid4().hex
self.project = self.new_project_ref(
domain_id=self.domain_id)
self.project['id'] = self.project_id
self.identity_api.create_project(
self.project_id,
self.project.copy())
self.user_id = uuid.uuid4().hex
self.user = self.new_user_ref(
domain_id=self.domain_id,
project_id=self.project_id)
self.user['id'] = self.user_id
self.identity_api.create_user(
self.user_id,
self.user.copy())
self.credential_id = uuid.uuid4().hex
self.credential = self.new_credential_ref(
user_id=self.user_id,
project_id=self.project_id)
self.credential['id'] = self.credential_id
self.identity_api.create_credential(
self.credential_id,
self.credential.copy())
self.role_id = uuid.uuid4().hex
self.role = self.new_role_ref()
self.role['id'] = self.role_id
self.identity_api.create_role(
self.role_id,
self.role.copy())
# domain validation
def assertValidDomainListResponse(self, resp, ref):
return self.assertValidListResponse(
resp,
'domains',
self.assertValidDomain,
ref)
def assertValidDomainResponse(self, resp, ref):
return self.assertValidResponse(
resp,
'domain',
self.assertValidDomain,
ref)
def assertValidDomain(self, entity, ref=None):
if ref:
pass
return entity
# project validation
def assertValidProjectListResponse(self, resp, ref):
return self.assertValidListResponse(
resp,
'projects',
self.assertValidProject,
ref)
def assertValidProjectResponse(self, resp, ref):
return self.assertValidResponse(
resp,
'project',
self.assertValidProject,
ref)
def assertValidProject(self, entity, ref=None):
self.assertIsNotNone(entity.get('domain_id'))
if ref:
self.assertEqual(ref['domain_id'], entity['domain_id'])
return entity
# user validation
def assertValidUserListResponse(self, resp, ref):
return self.assertValidListResponse(
resp,
'users',
self.assertValidUser,
ref)
def assertValidUserResponse(self, resp, ref):
return self.assertValidResponse(
resp,
'user',
self.assertValidUser,
ref)
def assertValidUser(self, entity, ref=None):
self.assertIsNotNone(entity.get('domain_id'))
self.assertIsNotNone(entity.get('email'))
self.assertIsNone(entity.get('password'))
if ref:
self.assertEqual(ref['domain_id'], entity['domain_id'])
self.assertEqual(ref['email'], entity['email'])
return entity
# credential validation
def assertValidCredentialListResponse(self, resp, ref):
return self.assertValidListResponse(
resp,
'credentials',
self.assertValidCredential,
ref)
def assertValidCredentialResponse(self, resp, ref):
return self.assertValidResponse(
resp,
'credential',
self.assertValidCredential,
ref)
def assertValidCredential(self, entity, ref=None):
self.assertIsNotNone(entity.get('user_id'))
self.assertIsNotNone(entity.get('blob'))
self.assertIsNotNone(entity.get('type'))
if ref:
self.assertEqual(ref['user_id'], entity['user_id'])
self.assertEqual(ref['blob'], entity['blob'])
self.assertEqual(ref['type'], entity['type'])
self.assertEqual(ref.get('project_id'), entity.get('project_id'))
return entity
# role validation
def assertValidRoleListResponse(self, resp, ref):
return self.assertValidListResponse(
resp,
'roles',
self.assertValidRole,
ref)
def assertValidRoleResponse(self, resp, ref):
return self.assertValidResponse(
resp,
'role',
self.assertValidRole,
ref)
def assertValidRole(self, entity, ref=None):
if ref:
pass
return entity
# domain crud tests
def test_create_domain(self):
"""POST /domains"""
ref = self.new_domain_ref()
r = self.post(
'/domains',
body={'domain': ref})
return self.assertValidDomainResponse(r, ref)
def test_list_domains(self):
"""GET /domains"""
r = self.get('/domains')
self.assertValidDomainListResponse(r, self.domain)
def test_get_domain(self):
"""GET /domains/{domain_id}"""
r = self.get('/domains/%(domain_id)s' % {
'domain_id': self.domain_id})
self.assertValidDomainResponse(r, self.domain)
def test_update_domain(self):
"""PATCH /domains/{domain_id}"""
ref = self.new_domain_ref()
del ref['id']
r = self.patch('/domains/%(domain_id)s' % {
'domain_id': self.domain_id},
body={'domain': ref})
self.assertValidDomainResponse(r, ref)
def test_delete_domain(self):
"""DELETE /domains/{domain_id}"""
self.delete('/domains/%(domain_id)s' % {
'domain_id': self.domain_id})
# project crud tests
def test_list_projects(self):
"""GET /projects"""
r = self.get('/projects')
self.assertValidProjectListResponse(r, self.project)
def test_create_project(self):
"""POST /projects"""
ref = self.new_project_ref(domain_id=self.domain_id)
r = self.post(
'/projects',
body={'project': ref})
self.assertValidProjectResponse(r, ref)
def test_get_project(self):
"""GET /projects/{project_id}"""
r = self.get(
'/projects/%(project_id)s' % {
'project_id': self.project_id})
self.assertValidProjectResponse(r, self.project)
def test_update_project(self):
"""PATCH /projects/{project_id}"""
ref = self.new_project_ref(domain_id=self.domain_id)
del ref['id']
r = self.patch(
'/projects/%(project_id)s' % {
'project_id': self.project_id},
body={'project': ref})
self.assertValidProjectResponse(r, ref)
def test_delete_project(self):
"""DELETE /projects/{project_id}"""
self.delete(
'/projects/%(project_id)s' % {
'project_id': self.project_id})
# user crud tests
def test_create_user(self):
"""POST /users"""
ref = self.new_user_ref(domain_id=self.domain_id)
r = self.post(
'/users',
body={'user': ref})
return self.assertValidUserResponse(r, ref)
def test_list_users(self):
"""GET /users"""
r = self.get('/users')
self.assertValidUserListResponse(r, self.user)
def test_get_user(self):
"""GET /users/{user_id}"""
r = self.get('/users/%(user_id)s' % {
'user_id': self.user_id})
self.assertValidUserResponse(r, self.user)
def test_update_user(self):
"""PATCH /users/{user_id}"""
user = self.new_user_ref(domain_id=self.domain_id)
del user['id']
r = self.patch('/users/%(user_id)s' % {
'user_id': self.user_id},
body={'user': user})
self.assertValidUserResponse(r, user)
def test_delete_user(self):
"""DELETE /users/{user_id}"""
self.delete('/users/%(user_id)s' % {
'user_id': self.user_id})
# credential crud tests
def test_list_credentials(self):
"""GET /credentials"""
r = self.get('/credentials')
self.assertValidCredentialListResponse(r, self.credential)
def test_create_credential(self):
"""POST /credentials"""
ref = self.new_credential_ref(user_id=self.user_id)
r = self.post(
'/credentials',
body={'credential': ref})
self.assertValidCredentialResponse(r, ref)
def test_get_credential(self):
"""GET /credentials/{credential_id}"""
r = self.get(
'/credentials/%(credential_id)s' % {
'credential_id': self.credential_id})
self.assertValidCredentialResponse(r, self.credential)
def test_update_credential(self):
"""PATCH /credentials/{credential_id}"""
ref = self.new_credential_ref(
user_id=self.user_id,
project_id=self.project_id)
del ref['id']
r = self.patch(
'/credentials/%(credential_id)s' % {
'credential_id': self.credential_id},
body={'credential': ref})
self.assertValidCredentialResponse(r, ref)
def test_delete_credential(self):
"""DELETE /credentials/{credential_id}"""
self.delete(
'/credentials/%(credential_id)s' % {
'credential_id': self.credential_id})
# role crud tests
def test_create_role(self):
"""POST /roles"""
ref = self.new_role_ref()
r = self.post(
'/roles',
body={'role': ref})
return self.assertValidRoleResponse(r, ref)
def test_list_roles(self):
"""GET /roles"""
r = self.get('/roles')
self.assertValidRoleListResponse(r, self.role)
def test_get_role(self):
"""GET /roles/{role_id}"""
r = self.get('/roles/%(role_id)s' % {
'role_id': self.role_id})
self.assertValidRoleResponse(r, self.role)
def test_update_role(self):
"""PATCH /roles/{role_id}"""
ref = self.new_role_ref()
del ref['id']
r = self.patch('/roles/%(role_id)s' % {
'role_id': self.role_id},
body={'role': ref})
self.assertValidRoleResponse(r, ref)
def test_delete_role(self):
"""DELETE /roles/{role_id}"""
self.delete('/roles/%(role_id)s' % {
'role_id': self.role_id})