Ensures duplicate users and tenants can't be made
* adds test for duplicate names and ids for backends * also adds test for rename duplicates and changing ids * makes kvs backend raise an exception if duplicate is requested * ensures kvs backend doesn't allow update of id * makes sure that kvs is reset between tests * cleans up a few imports * fixes bug 927291 * fixes bug 928659 Change-Id: Ia6eb1961796cbde7ed57a75cd9394d77c88cf655
This commit is contained in:
parent
8da000ae0b
commit
f0f8ddeaa8
|
@ -125,6 +125,10 @@ class Identity(kvs.Base, identity.Driver):
|
|||
|
||||
# CRUD
|
||||
def create_user(self, user_id, user):
|
||||
if self.get_user(user_id):
|
||||
raise Exception('Duplicate id')
|
||||
if self.get_user_by_name(user['name']):
|
||||
raise Exception('Duplicate name')
|
||||
user = _ensure_hashed_password(user)
|
||||
self.db.set('user-%s' % user_id, user)
|
||||
self.db.set('user_name-%s' % user['name'], user)
|
||||
|
@ -134,11 +138,16 @@ class Identity(kvs.Base, identity.Driver):
|
|||
return user
|
||||
|
||||
def update_user(self, user_id, user):
|
||||
if 'name' in user:
|
||||
existing = self.db.get('user_name-%s' % user['name'])
|
||||
if existing and user_id != existing['id']:
|
||||
raise Exception('Duplicate name')
|
||||
# get the old name and delete it too
|
||||
old_user = self.db.get('user-%s' % user_id)
|
||||
new_user = old_user.copy()
|
||||
user = _ensure_hashed_password(user)
|
||||
new_user.update(user)
|
||||
new_user['id'] = user_id
|
||||
self.db.delete('user_name-%s' % old_user['name'])
|
||||
self.db.set('user-%s' % user_id, new_user)
|
||||
self.db.set('user_name-%s' % new_user['name'], new_user)
|
||||
|
@ -154,16 +163,26 @@ class Identity(kvs.Base, identity.Driver):
|
|||
return None
|
||||
|
||||
def create_tenant(self, tenant_id, tenant):
|
||||
if self.get_tenant(tenant_id):
|
||||
raise Exception('Duplicate id')
|
||||
if self.get_tenant_by_name(tenant['name']):
|
||||
raise Exception('Duplicate name')
|
||||
self.db.set('tenant-%s' % tenant_id, tenant)
|
||||
self.db.set('tenant_name-%s' % tenant['name'], tenant)
|
||||
return tenant
|
||||
|
||||
def update_tenant(self, tenant_id, tenant):
|
||||
if 'name' in tenant:
|
||||
existing = self.db.get('tenant_name-%s' % tenant['name'])
|
||||
if existing and tenant_id != existing['id']:
|
||||
raise Exception('Duplicate name')
|
||||
# get the old name and delete it too
|
||||
old_tenant = self.db.get('tenant-%s' % tenant_id)
|
||||
new_tenant = old_tenant.copy()
|
||||
new_tenant['id'] = tenant_id
|
||||
self.db.delete('tenant_name-%s' % old_tenant['name'])
|
||||
self.db.set('tenant-%s' % tenant_id, tenant)
|
||||
self.db.set('tenant_name-%s' % tenant['name'], tenant)
|
||||
self.db.set('tenant-%s' % tenant_id, new_tenant)
|
||||
self.db.set('tenant_name-%s' % new_tenant['name'], new_tenant)
|
||||
return tenant
|
||||
|
||||
def delete_tenant(self, tenant_id):
|
||||
|
|
|
@ -8,10 +8,8 @@ import time
|
|||
|
||||
from paste import deploy
|
||||
|
||||
from keystone import catalog
|
||||
from keystone import config
|
||||
from keystone import identity
|
||||
from keystone import token
|
||||
from keystone.common import kvs
|
||||
from keystone.common import logging
|
||||
from keystone.common import utils
|
||||
from keystone.common import wsgi
|
||||
|
@ -119,6 +117,7 @@ class TestCase(unittest.TestCase):
|
|||
for path in self._paths:
|
||||
if path in sys.path:
|
||||
sys.path.remove(path)
|
||||
kvs.INMEMDB.clear()
|
||||
CONF.reset()
|
||||
super(TestCase, self).tearDown()
|
||||
|
||||
|
|
|
@ -1,108 +1,199 @@
|
|||
class IdentityTests(object):
|
||||
def test_authenticate_bad_user(self):
|
||||
self.assertRaises(AssertionError,
|
||||
self.identity_api.authenticate,
|
||||
user_id=self.user_foo['id'] + 'WRONG',
|
||||
tenant_id=self.tenant_bar['id'],
|
||||
password=self.user_foo['password'])
|
||||
def test_authenticate_bad_user(self):
|
||||
self.assertRaises(AssertionError,
|
||||
self.identity_api.authenticate,
|
||||
user_id=self.user_foo['id'] + 'WRONG',
|
||||
tenant_id=self.tenant_bar['id'],
|
||||
password=self.user_foo['password'])
|
||||
|
||||
def test_authenticate_bad_password(self):
|
||||
self.assertRaises(AssertionError,
|
||||
self.identity_api.authenticate,
|
||||
user_id=self.user_foo['id'],
|
||||
tenant_id=self.tenant_bar['id'],
|
||||
password=self.user_foo['password'] + 'WRONG')
|
||||
def test_authenticate_bad_password(self):
|
||||
self.assertRaises(AssertionError,
|
||||
self.identity_api.authenticate,
|
||||
user_id=self.user_foo['id'],
|
||||
tenant_id=self.tenant_bar['id'],
|
||||
password=self.user_foo['password'] + 'WRONG')
|
||||
|
||||
def test_authenticate_invalid_tenant(self):
|
||||
self.assertRaises(AssertionError,
|
||||
self.identity_api.authenticate,
|
||||
user_id=self.user_foo['id'],
|
||||
tenant_id=self.tenant_bar['id'] + 'WRONG',
|
||||
password=self.user_foo['password'])
|
||||
def test_authenticate_invalid_tenant(self):
|
||||
self.assertRaises(AssertionError,
|
||||
self.identity_api.authenticate,
|
||||
user_id=self.user_foo['id'],
|
||||
tenant_id=self.tenant_bar['id'] + 'WRONG',
|
||||
password=self.user_foo['password'])
|
||||
|
||||
def test_authenticate_no_tenant(self):
|
||||
user_ref, tenant_ref, metadata_ref = self.identity_api.authenticate(
|
||||
user_id=self.user_foo['id'],
|
||||
password=self.user_foo['password'])
|
||||
# NOTE(termie): the password field is left in user_foo to make it easier
|
||||
# to authenticate in tests, but should not be returned by
|
||||
# the api
|
||||
self.user_foo.pop('password')
|
||||
self.assertDictEquals(user_ref, self.user_foo)
|
||||
self.assert_(tenant_ref is None)
|
||||
self.assert_(not metadata_ref)
|
||||
def test_authenticate_no_tenant(self):
|
||||
user_ref, tenant_ref, metadata_ref = self.identity_api.authenticate(
|
||||
user_id=self.user_foo['id'],
|
||||
password=self.user_foo['password'])
|
||||
# NOTE(termie): the password field is left in user_foo to make it easier
|
||||
# to authenticate in tests, but should not be returned by
|
||||
# the api
|
||||
self.user_foo.pop('password')
|
||||
self.assertDictEquals(user_ref, self.user_foo)
|
||||
self.assert_(tenant_ref is None)
|
||||
self.assert_(not metadata_ref)
|
||||
|
||||
def test_authenticate(self):
|
||||
user_ref, tenant_ref, metadata_ref = self.identity_api.authenticate(
|
||||
user_id=self.user_foo['id'],
|
||||
tenant_id=self.tenant_bar['id'],
|
||||
password=self.user_foo['password'])
|
||||
# NOTE(termie): the password field is left in user_foo to make it easier
|
||||
# to authenticate in tests, but should not be returned by
|
||||
# the api
|
||||
self.user_foo.pop('password')
|
||||
self.assertDictEquals(user_ref, self.user_foo)
|
||||
self.assertDictEquals(tenant_ref, self.tenant_bar)
|
||||
self.assertDictEquals(metadata_ref, self.metadata_foobar)
|
||||
def test_authenticate(self):
|
||||
user_ref, tenant_ref, metadata_ref = self.identity_api.authenticate(
|
||||
user_id=self.user_foo['id'],
|
||||
tenant_id=self.tenant_bar['id'],
|
||||
password=self.user_foo['password'])
|
||||
# NOTE(termie): the password field is left in user_foo to make it easier
|
||||
# to authenticate in tests, but should not be returned by
|
||||
# the api
|
||||
self.user_foo.pop('password')
|
||||
self.assertDictEquals(user_ref, self.user_foo)
|
||||
self.assertDictEquals(tenant_ref, self.tenant_bar)
|
||||
self.assertDictEquals(metadata_ref, self.metadata_foobar)
|
||||
|
||||
def test_password_hashed(self):
|
||||
user_ref = self.identity_api._get_user(self.user_foo['id'])
|
||||
self.assertNotEqual(user_ref['password'], self.user_foo['password'])
|
||||
def test_password_hashed(self):
|
||||
user_ref = self.identity_api._get_user(self.user_foo['id'])
|
||||
self.assertNotEqual(user_ref['password'], self.user_foo['password'])
|
||||
|
||||
|
||||
def test_get_tenant_bad_tenant(self):
|
||||
tenant_ref = self.identity_api.get_tenant(
|
||||
tenant_id=self.tenant_bar['id'] + 'WRONG')
|
||||
self.assert_(tenant_ref is None)
|
||||
def test_get_tenant_bad_tenant(self):
|
||||
tenant_ref = self.identity_api.get_tenant(
|
||||
tenant_id=self.tenant_bar['id'] + 'WRONG')
|
||||
self.assert_(tenant_ref is None)
|
||||
|
||||
def test_get_tenant(self):
|
||||
tenant_ref = self.identity_api.get_tenant(tenant_id=self.tenant_bar['id'])
|
||||
self.assertDictEquals(tenant_ref, self.tenant_bar)
|
||||
def test_get_tenant(self):
|
||||
tenant_ref = self.identity_api.get_tenant(tenant_id=self.tenant_bar['id'])
|
||||
self.assertDictEquals(tenant_ref, self.tenant_bar)
|
||||
|
||||
def test_get_tenant_by_name_bad_tenant(self):
|
||||
tenant_ref = self.identity_api.get_tenant(
|
||||
tenant_id=self.tenant_bar['name'] + 'WRONG')
|
||||
self.assert_(tenant_ref is None)
|
||||
def test_get_tenant_by_name_bad_tenant(self):
|
||||
tenant_ref = self.identity_api.get_tenant(
|
||||
tenant_id=self.tenant_bar['name'] + 'WRONG')
|
||||
self.assert_(tenant_ref is None)
|
||||
|
||||
def test_get_tenant_by_name(self):
|
||||
tenant_ref = self.identity_api.get_tenant_by_name(
|
||||
tenant_name=self.tenant_bar['name'])
|
||||
self.assertDictEquals(tenant_ref, self.tenant_bar)
|
||||
def test_get_tenant_by_name(self):
|
||||
tenant_ref = self.identity_api.get_tenant_by_name(
|
||||
tenant_name=self.tenant_bar['name'])
|
||||
self.assertDictEquals(tenant_ref, self.tenant_bar)
|
||||
|
||||
def test_get_user_bad_user(self):
|
||||
user_ref = self.identity_api.get_user(
|
||||
user_id=self.user_foo['id'] + 'WRONG')
|
||||
self.assert_(user_ref is None)
|
||||
def test_get_user_bad_user(self):
|
||||
user_ref = self.identity_api.get_user(
|
||||
user_id=self.user_foo['id'] + 'WRONG')
|
||||
self.assert_(user_ref is None)
|
||||
|
||||
def test_get_user(self):
|
||||
user_ref = self.identity_api.get_user(user_id=self.user_foo['id'])
|
||||
# NOTE(termie): the password field is left in user_foo to make it easier
|
||||
# to authenticate in tests, but should not be returned by
|
||||
# the api
|
||||
self.user_foo.pop('password')
|
||||
self.assertDictEquals(user_ref, self.user_foo)
|
||||
def test_get_user(self):
|
||||
user_ref = self.identity_api.get_user(user_id=self.user_foo['id'])
|
||||
# NOTE(termie): the password field is left in user_foo to make it easier
|
||||
# to authenticate in tests, but should not be returned by
|
||||
# the api
|
||||
self.user_foo.pop('password')
|
||||
self.assertDictEquals(user_ref, self.user_foo)
|
||||
|
||||
def test_get_metadata_bad_user(self):
|
||||
metadata_ref = self.identity_api.get_metadata(
|
||||
user_id=self.user_foo['id'] + 'WRONG',
|
||||
tenant_id=self.tenant_bar['id'])
|
||||
self.assert_(metadata_ref is None)
|
||||
def test_get_metadata_bad_user(self):
|
||||
metadata_ref = self.identity_api.get_metadata(
|
||||
user_id=self.user_foo['id'] + 'WRONG',
|
||||
tenant_id=self.tenant_bar['id'])
|
||||
self.assert_(metadata_ref is None)
|
||||
|
||||
def test_get_metadata_bad_tenant(self):
|
||||
metadata_ref = self.identity_api.get_metadata(
|
||||
user_id=self.user_foo['id'],
|
||||
tenant_id=self.tenant_bar['id'] + 'WRONG')
|
||||
self.assert_(metadata_ref is None)
|
||||
def test_get_metadata_bad_tenant(self):
|
||||
metadata_ref = self.identity_api.get_metadata(
|
||||
user_id=self.user_foo['id'],
|
||||
tenant_id=self.tenant_bar['id'] + 'WRONG')
|
||||
self.assert_(metadata_ref is None)
|
||||
|
||||
def test_get_metadata(self):
|
||||
metadata_ref = self.identity_api.get_metadata(
|
||||
user_id=self.user_foo['id'],
|
||||
tenant_id=self.tenant_bar['id'])
|
||||
self.assertDictEquals(metadata_ref, self.metadata_foobar)
|
||||
def test_get_metadata(self):
|
||||
metadata_ref = self.identity_api.get_metadata(
|
||||
user_id=self.user_foo['id'],
|
||||
tenant_id=self.tenant_bar['id'])
|
||||
self.assertDictEquals(metadata_ref, self.metadata_foobar)
|
||||
|
||||
def test_get_role(self):
|
||||
role_ref = self.identity_api.get_role(
|
||||
role_id=self.role_keystone_admin['id'])
|
||||
self.assertDictEquals(role_ref, self.role_keystone_admin)
|
||||
def test_get_role(self):
|
||||
role_ref = self.identity_api.get_role(
|
||||
role_id=self.role_keystone_admin['id'])
|
||||
self.assertDictEquals(role_ref, self.role_keystone_admin)
|
||||
|
||||
def test_create_duplicate_user_id_fails(self):
|
||||
user = {'id': 'fake1',
|
||||
'name': 'fake1',
|
||||
'password': 'fakepass',
|
||||
'tenants': ['bar',]}
|
||||
self.identity_api.create_user('fake1', user)
|
||||
user['name'] = 'fake2'
|
||||
self.assertRaises(Exception,
|
||||
self.identity_api.create_user,
|
||||
'fake1',
|
||||
user)
|
||||
|
||||
def test_create_duplicate_user_name_fails(self):
|
||||
user = {'id': 'fake1',
|
||||
'name': 'fake1',
|
||||
'password': 'fakepass',
|
||||
'tenants': ['bar',]}
|
||||
self.identity_api.create_user('fake1', user)
|
||||
user['id'] = 'fake2'
|
||||
self.assertRaises(Exception,
|
||||
self.identity_api.create_user,
|
||||
'fake2',
|
||||
user)
|
||||
|
||||
def test_rename_duplicate_user_name_fails(self):
|
||||
user1 = {'id': 'fake1',
|
||||
'name': 'fake1',
|
||||
'password': 'fakepass',
|
||||
'tenants': ['bar',]}
|
||||
user2 = {'id': 'fake2',
|
||||
'name': 'fake2',
|
||||
'password': 'fakepass',
|
||||
'tenants': ['bar',]}
|
||||
self.identity_api.create_user('fake1', user1)
|
||||
self.identity_api.create_user('fake2', user2)
|
||||
user2['name'] = 'fake1'
|
||||
self.assertRaises(Exception,
|
||||
self.identity_api.update_user,
|
||||
'fake2',
|
||||
user2)
|
||||
|
||||
def test_update_user_id_does_nothing(self):
|
||||
user = {'id': 'fake1',
|
||||
'name': 'fake1',
|
||||
'password': 'fakepass',
|
||||
'tenants': ['bar',]}
|
||||
self.identity_api.create_user('fake1', user)
|
||||
user['id'] = 'fake2'
|
||||
self.identity_api.update_user('fake1', user)
|
||||
user_ref = self.identity_api.get_user('fake1')
|
||||
self.assertEqual(user_ref['id'], 'fake1')
|
||||
user_ref = self.identity_api.get_user('fake2')
|
||||
self.assert_(user_ref is None)
|
||||
|
||||
def test_create_duplicate_tenant_id_fails(self):
|
||||
tenant = {'id': 'fake1', 'name': 'fake1'}
|
||||
self.identity_api.create_tenant('fake1', tenant)
|
||||
tenant['name'] = 'fake2'
|
||||
self.assertRaises(Exception,
|
||||
self.identity_api.create_tenant,
|
||||
'fake1',
|
||||
tenant)
|
||||
|
||||
def test_create_duplicate_tenant_name_fails(self):
|
||||
tenant = {'id': 'fake1', 'name': 'fake'}
|
||||
self.identity_api.create_tenant('fake1', tenant)
|
||||
tenant['id'] = 'fake2'
|
||||
self.assertRaises(Exception,
|
||||
self.identity_api.create_tenant,
|
||||
'fake1',
|
||||
tenant)
|
||||
|
||||
def test_rename_duplicate_tenant_name_fails(self):
|
||||
tenant1 = {'id': 'fake1', 'name': 'fake1'}
|
||||
tenant2 = {'id': 'fake2', 'name': 'fake2'}
|
||||
self.identity_api.create_tenant('fake1', tenant1)
|
||||
self.identity_api.create_tenant('fake2', tenant2)
|
||||
tenant2['name'] = 'fake1'
|
||||
self.assertRaises(Exception,
|
||||
self.identity_api.update_tenant,
|
||||
'fake2',
|
||||
tenant2)
|
||||
|
||||
def test_update_tenant_id_does_nothing(self):
|
||||
tenant = {'id': 'fake1', 'name': 'fake1'}
|
||||
self.identity_api.create_tenant('fake1', tenant)
|
||||
tenant['id'] = 'fake2'
|
||||
self.identity_api.update_tenant('fake1', tenant)
|
||||
tenant_ref = self.identity_api.get_tenant('fake1')
|
||||
self.assertEqual(tenant_ref['id'], 'fake1')
|
||||
tenant_ref = self.identity_api.get_tenant('fake2')
|
||||
self.assert_(tenant_ref is None)
|
||||
|
|
Loading…
Reference in New Issue