Merge "Ensures duplicate users and tenants can't be made" into redux

This commit is contained in:
Jenkins 2012-02-08 17:52:32 +00:00 committed by Gerrit Code Review
commit 3364905041
3 changed files with 203 additions and 94 deletions

View File

@ -125,6 +125,10 @@ class Identity(kvs.Base, identity.Driver):
# CRUD
def create_user(self, user_id, user):
if self.get_user(user_id):
raise Exception('Duplicate id')
if self.get_user_by_name(user['name']):
raise Exception('Duplicate name')
user = _ensure_hashed_password(user)
self.db.set('user-%s' % user_id, user)
self.db.set('user_name-%s' % user['name'], user)
@ -134,11 +138,16 @@ class Identity(kvs.Base, identity.Driver):
return user
def update_user(self, user_id, user):
if 'name' in user:
existing = self.db.get('user_name-%s' % user['name'])
if existing and user_id != existing['id']:
raise Exception('Duplicate name')
# get the old name and delete it too
old_user = self.db.get('user-%s' % user_id)
new_user = old_user.copy()
user = _ensure_hashed_password(user)
new_user.update(user)
new_user['id'] = user_id
self.db.delete('user_name-%s' % old_user['name'])
self.db.set('user-%s' % user_id, new_user)
self.db.set('user_name-%s' % new_user['name'], new_user)
@ -154,16 +163,26 @@ class Identity(kvs.Base, identity.Driver):
return None
def create_tenant(self, tenant_id, tenant):
if self.get_tenant(tenant_id):
raise Exception('Duplicate id')
if self.get_tenant_by_name(tenant['name']):
raise Exception('Duplicate name')
self.db.set('tenant-%s' % tenant_id, tenant)
self.db.set('tenant_name-%s' % tenant['name'], tenant)
return tenant
def update_tenant(self, tenant_id, tenant):
if 'name' in tenant:
existing = self.db.get('tenant_name-%s' % tenant['name'])
if existing and tenant_id != existing['id']:
raise Exception('Duplicate name')
# get the old name and delete it too
old_tenant = self.db.get('tenant-%s' % tenant_id)
new_tenant = old_tenant.copy()
new_tenant['id'] = tenant_id
self.db.delete('tenant_name-%s' % old_tenant['name'])
self.db.set('tenant-%s' % tenant_id, tenant)
self.db.set('tenant_name-%s' % tenant['name'], tenant)
self.db.set('tenant-%s' % tenant_id, new_tenant)
self.db.set('tenant_name-%s' % new_tenant['name'], new_tenant)
return tenant
def delete_tenant(self, tenant_id):

View File

@ -8,10 +8,8 @@ import time
from paste import deploy
from keystone import catalog
from keystone import config
from keystone import identity
from keystone import token
from keystone.common import kvs
from keystone.common import logging
from keystone.common import utils
from keystone.common import wsgi
@ -119,6 +117,7 @@ class TestCase(unittest.TestCase):
for path in self._paths:
if path in sys.path:
sys.path.remove(path)
kvs.INMEMDB.clear()
CONF.reset()
super(TestCase, self).tearDown()

View File

@ -105,4 +105,95 @@ class IdentityTests(object):
role_id=self.role_keystone_admin['id'])
self.assertDictEquals(role_ref, self.role_keystone_admin)
def test_create_duplicate_user_id_fails(self):
user = {'id': 'fake1',
'name': 'fake1',
'password': 'fakepass',
'tenants': ['bar',]}
self.identity_api.create_user('fake1', user)
user['name'] = 'fake2'
self.assertRaises(Exception,
self.identity_api.create_user,
'fake1',
user)
def test_create_duplicate_user_name_fails(self):
user = {'id': 'fake1',
'name': 'fake1',
'password': 'fakepass',
'tenants': ['bar',]}
self.identity_api.create_user('fake1', user)
user['id'] = 'fake2'
self.assertRaises(Exception,
self.identity_api.create_user,
'fake2',
user)
def test_rename_duplicate_user_name_fails(self):
user1 = {'id': 'fake1',
'name': 'fake1',
'password': 'fakepass',
'tenants': ['bar',]}
user2 = {'id': 'fake2',
'name': 'fake2',
'password': 'fakepass',
'tenants': ['bar',]}
self.identity_api.create_user('fake1', user1)
self.identity_api.create_user('fake2', user2)
user2['name'] = 'fake1'
self.assertRaises(Exception,
self.identity_api.update_user,
'fake2',
user2)
def test_update_user_id_does_nothing(self):
user = {'id': 'fake1',
'name': 'fake1',
'password': 'fakepass',
'tenants': ['bar',]}
self.identity_api.create_user('fake1', user)
user['id'] = 'fake2'
self.identity_api.update_user('fake1', user)
user_ref = self.identity_api.get_user('fake1')
self.assertEqual(user_ref['id'], 'fake1')
user_ref = self.identity_api.get_user('fake2')
self.assert_(user_ref is None)
def test_create_duplicate_tenant_id_fails(self):
tenant = {'id': 'fake1', 'name': 'fake1'}
self.identity_api.create_tenant('fake1', tenant)
tenant['name'] = 'fake2'
self.assertRaises(Exception,
self.identity_api.create_tenant,
'fake1',
tenant)
def test_create_duplicate_tenant_name_fails(self):
tenant = {'id': 'fake1', 'name': 'fake'}
self.identity_api.create_tenant('fake1', tenant)
tenant['id'] = 'fake2'
self.assertRaises(Exception,
self.identity_api.create_tenant,
'fake1',
tenant)
def test_rename_duplicate_tenant_name_fails(self):
tenant1 = {'id': 'fake1', 'name': 'fake1'}
tenant2 = {'id': 'fake2', 'name': 'fake2'}
self.identity_api.create_tenant('fake1', tenant1)
self.identity_api.create_tenant('fake2', tenant2)
tenant2['name'] = 'fake1'
self.assertRaises(Exception,
self.identity_api.update_tenant,
'fake2',
tenant2)
def test_update_tenant_id_does_nothing(self):
tenant = {'id': 'fake1', 'name': 'fake1'}
self.identity_api.create_tenant('fake1', tenant)
tenant['id'] = 'fake2'
self.identity_api.update_tenant('fake1', tenant)
tenant_ref = self.identity_api.get_tenant('fake1')
self.assertEqual(tenant_ref['id'], 'fake1')
tenant_ref = self.identity_api.get_tenant('fake2')
self.assert_(tenant_ref is None)