diff --git a/keystone/identity/backends/kvs.py b/keystone/identity/backends/kvs.py index 8215054b49..7dfc863370 100644 --- a/keystone/identity/backends/kvs.py +++ b/keystone/identity/backends/kvs.py @@ -125,6 +125,10 @@ class Identity(kvs.Base, identity.Driver): # CRUD def create_user(self, user_id, user): + if self.get_user(user_id): + raise Exception('Duplicate id') + if self.get_user_by_name(user['name']): + raise Exception('Duplicate name') user = _ensure_hashed_password(user) self.db.set('user-%s' % user_id, user) self.db.set('user_name-%s' % user['name'], user) @@ -134,11 +138,16 @@ class Identity(kvs.Base, identity.Driver): return user def update_user(self, user_id, user): + if 'name' in user: + existing = self.db.get('user_name-%s' % user['name']) + if existing and user_id != existing['id']: + raise Exception('Duplicate name') # get the old name and delete it too old_user = self.db.get('user-%s' % user_id) new_user = old_user.copy() user = _ensure_hashed_password(user) new_user.update(user) + new_user['id'] = user_id self.db.delete('user_name-%s' % old_user['name']) self.db.set('user-%s' % user_id, new_user) self.db.set('user_name-%s' % new_user['name'], new_user) @@ -154,16 +163,26 @@ class Identity(kvs.Base, identity.Driver): return None def create_tenant(self, tenant_id, tenant): + if self.get_tenant(tenant_id): + raise Exception('Duplicate id') + if self.get_tenant_by_name(tenant['name']): + raise Exception('Duplicate name') self.db.set('tenant-%s' % tenant_id, tenant) self.db.set('tenant_name-%s' % tenant['name'], tenant) return tenant def update_tenant(self, tenant_id, tenant): + if 'name' in tenant: + existing = self.db.get('tenant_name-%s' % tenant['name']) + if existing and tenant_id != existing['id']: + raise Exception('Duplicate name') # get the old name and delete it too old_tenant = self.db.get('tenant-%s' % tenant_id) + new_tenant = old_tenant.copy() + new_tenant['id'] = tenant_id self.db.delete('tenant_name-%s' % old_tenant['name']) - self.db.set('tenant-%s' % tenant_id, tenant) - self.db.set('tenant_name-%s' % tenant['name'], tenant) + self.db.set('tenant-%s' % tenant_id, new_tenant) + self.db.set('tenant_name-%s' % new_tenant['name'], new_tenant) return tenant def delete_tenant(self, tenant_id): diff --git a/keystone/test.py b/keystone/test.py index 720b4db832..1eeff449ab 100644 --- a/keystone/test.py +++ b/keystone/test.py @@ -8,10 +8,8 @@ import time from paste import deploy -from keystone import catalog from keystone import config -from keystone import identity -from keystone import token +from keystone.common import kvs from keystone.common import logging from keystone.common import utils from keystone.common import wsgi @@ -119,6 +117,7 @@ class TestCase(unittest.TestCase): for path in self._paths: if path in sys.path: sys.path.remove(path) + kvs.INMEMDB.clear() CONF.reset() super(TestCase, self).tearDown() diff --git a/tests/test_backend.py b/tests/test_backend.py index 151700aa23..b106f23f2c 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -1,108 +1,199 @@ class IdentityTests(object): - def test_authenticate_bad_user(self): - self.assertRaises(AssertionError, - self.identity_api.authenticate, - user_id=self.user_foo['id'] + 'WRONG', - tenant_id=self.tenant_bar['id'], - password=self.user_foo['password']) + def test_authenticate_bad_user(self): + self.assertRaises(AssertionError, + self.identity_api.authenticate, + user_id=self.user_foo['id'] + 'WRONG', + tenant_id=self.tenant_bar['id'], + password=self.user_foo['password']) - def test_authenticate_bad_password(self): - self.assertRaises(AssertionError, - self.identity_api.authenticate, - user_id=self.user_foo['id'], - tenant_id=self.tenant_bar['id'], - password=self.user_foo['password'] + 'WRONG') + def test_authenticate_bad_password(self): + self.assertRaises(AssertionError, + self.identity_api.authenticate, + user_id=self.user_foo['id'], + tenant_id=self.tenant_bar['id'], + password=self.user_foo['password'] + 'WRONG') - def test_authenticate_invalid_tenant(self): - self.assertRaises(AssertionError, - self.identity_api.authenticate, - user_id=self.user_foo['id'], - tenant_id=self.tenant_bar['id'] + 'WRONG', - password=self.user_foo['password']) + def test_authenticate_invalid_tenant(self): + self.assertRaises(AssertionError, + self.identity_api.authenticate, + user_id=self.user_foo['id'], + tenant_id=self.tenant_bar['id'] + 'WRONG', + password=self.user_foo['password']) - def test_authenticate_no_tenant(self): - user_ref, tenant_ref, metadata_ref = self.identity_api.authenticate( - user_id=self.user_foo['id'], - password=self.user_foo['password']) - # NOTE(termie): the password field is left in user_foo to make it easier - # to authenticate in tests, but should not be returned by - # the api - self.user_foo.pop('password') - self.assertDictEquals(user_ref, self.user_foo) - self.assert_(tenant_ref is None) - self.assert_(not metadata_ref) + def test_authenticate_no_tenant(self): + user_ref, tenant_ref, metadata_ref = self.identity_api.authenticate( + user_id=self.user_foo['id'], + password=self.user_foo['password']) + # NOTE(termie): the password field is left in user_foo to make it easier + # to authenticate in tests, but should not be returned by + # the api + self.user_foo.pop('password') + self.assertDictEquals(user_ref, self.user_foo) + self.assert_(tenant_ref is None) + self.assert_(not metadata_ref) - def test_authenticate(self): - user_ref, tenant_ref, metadata_ref = self.identity_api.authenticate( - user_id=self.user_foo['id'], - tenant_id=self.tenant_bar['id'], - password=self.user_foo['password']) - # NOTE(termie): the password field is left in user_foo to make it easier - # to authenticate in tests, but should not be returned by - # the api - self.user_foo.pop('password') - self.assertDictEquals(user_ref, self.user_foo) - self.assertDictEquals(tenant_ref, self.tenant_bar) - self.assertDictEquals(metadata_ref, self.metadata_foobar) + def test_authenticate(self): + user_ref, tenant_ref, metadata_ref = self.identity_api.authenticate( + user_id=self.user_foo['id'], + tenant_id=self.tenant_bar['id'], + password=self.user_foo['password']) + # NOTE(termie): the password field is left in user_foo to make it easier + # to authenticate in tests, but should not be returned by + # the api + self.user_foo.pop('password') + self.assertDictEquals(user_ref, self.user_foo) + self.assertDictEquals(tenant_ref, self.tenant_bar) + self.assertDictEquals(metadata_ref, self.metadata_foobar) - def test_password_hashed(self): - user_ref = self.identity_api._get_user(self.user_foo['id']) - self.assertNotEqual(user_ref['password'], self.user_foo['password']) + def test_password_hashed(self): + user_ref = self.identity_api._get_user(self.user_foo['id']) + self.assertNotEqual(user_ref['password'], self.user_foo['password']) - def test_get_tenant_bad_tenant(self): - tenant_ref = self.identity_api.get_tenant( - tenant_id=self.tenant_bar['id'] + 'WRONG') - self.assert_(tenant_ref is None) + def test_get_tenant_bad_tenant(self): + tenant_ref = self.identity_api.get_tenant( + tenant_id=self.tenant_bar['id'] + 'WRONG') + self.assert_(tenant_ref is None) - def test_get_tenant(self): - tenant_ref = self.identity_api.get_tenant(tenant_id=self.tenant_bar['id']) - self.assertDictEquals(tenant_ref, self.tenant_bar) + def test_get_tenant(self): + tenant_ref = self.identity_api.get_tenant(tenant_id=self.tenant_bar['id']) + self.assertDictEquals(tenant_ref, self.tenant_bar) - def test_get_tenant_by_name_bad_tenant(self): - tenant_ref = self.identity_api.get_tenant( - tenant_id=self.tenant_bar['name'] + 'WRONG') - self.assert_(tenant_ref is None) + def test_get_tenant_by_name_bad_tenant(self): + tenant_ref = self.identity_api.get_tenant( + tenant_id=self.tenant_bar['name'] + 'WRONG') + self.assert_(tenant_ref is None) - def test_get_tenant_by_name(self): - tenant_ref = self.identity_api.get_tenant_by_name( - tenant_name=self.tenant_bar['name']) - self.assertDictEquals(tenant_ref, self.tenant_bar) + def test_get_tenant_by_name(self): + tenant_ref = self.identity_api.get_tenant_by_name( + tenant_name=self.tenant_bar['name']) + self.assertDictEquals(tenant_ref, self.tenant_bar) - def test_get_user_bad_user(self): - user_ref = self.identity_api.get_user( - user_id=self.user_foo['id'] + 'WRONG') - self.assert_(user_ref is None) + def test_get_user_bad_user(self): + user_ref = self.identity_api.get_user( + user_id=self.user_foo['id'] + 'WRONG') + self.assert_(user_ref is None) - def test_get_user(self): - user_ref = self.identity_api.get_user(user_id=self.user_foo['id']) - # NOTE(termie): the password field is left in user_foo to make it easier - # to authenticate in tests, but should not be returned by - # the api - self.user_foo.pop('password') - self.assertDictEquals(user_ref, self.user_foo) + def test_get_user(self): + user_ref = self.identity_api.get_user(user_id=self.user_foo['id']) + # NOTE(termie): the password field is left in user_foo to make it easier + # to authenticate in tests, but should not be returned by + # the api + self.user_foo.pop('password') + self.assertDictEquals(user_ref, self.user_foo) - def test_get_metadata_bad_user(self): - metadata_ref = self.identity_api.get_metadata( - user_id=self.user_foo['id'] + 'WRONG', - tenant_id=self.tenant_bar['id']) - self.assert_(metadata_ref is None) + def test_get_metadata_bad_user(self): + metadata_ref = self.identity_api.get_metadata( + user_id=self.user_foo['id'] + 'WRONG', + tenant_id=self.tenant_bar['id']) + self.assert_(metadata_ref is None) - def test_get_metadata_bad_tenant(self): - metadata_ref = self.identity_api.get_metadata( - user_id=self.user_foo['id'], - tenant_id=self.tenant_bar['id'] + 'WRONG') - self.assert_(metadata_ref is None) + def test_get_metadata_bad_tenant(self): + metadata_ref = self.identity_api.get_metadata( + user_id=self.user_foo['id'], + tenant_id=self.tenant_bar['id'] + 'WRONG') + self.assert_(metadata_ref is None) - def test_get_metadata(self): - metadata_ref = self.identity_api.get_metadata( - user_id=self.user_foo['id'], - tenant_id=self.tenant_bar['id']) - self.assertDictEquals(metadata_ref, self.metadata_foobar) + def test_get_metadata(self): + metadata_ref = self.identity_api.get_metadata( + user_id=self.user_foo['id'], + tenant_id=self.tenant_bar['id']) + self.assertDictEquals(metadata_ref, self.metadata_foobar) - def test_get_role(self): - role_ref = self.identity_api.get_role( - role_id=self.role_keystone_admin['id']) - self.assertDictEquals(role_ref, self.role_keystone_admin) + def test_get_role(self): + role_ref = self.identity_api.get_role( + role_id=self.role_keystone_admin['id']) + self.assertDictEquals(role_ref, self.role_keystone_admin) + def test_create_duplicate_user_id_fails(self): + user = {'id': 'fake1', + 'name': 'fake1', + 'password': 'fakepass', + 'tenants': ['bar',]} + self.identity_api.create_user('fake1', user) + user['name'] = 'fake2' + self.assertRaises(Exception, + self.identity_api.create_user, + 'fake1', + user) + def test_create_duplicate_user_name_fails(self): + user = {'id': 'fake1', + 'name': 'fake1', + 'password': 'fakepass', + 'tenants': ['bar',]} + self.identity_api.create_user('fake1', user) + user['id'] = 'fake2' + self.assertRaises(Exception, + self.identity_api.create_user, + 'fake2', + user) + + def test_rename_duplicate_user_name_fails(self): + user1 = {'id': 'fake1', + 'name': 'fake1', + 'password': 'fakepass', + 'tenants': ['bar',]} + user2 = {'id': 'fake2', + 'name': 'fake2', + 'password': 'fakepass', + 'tenants': ['bar',]} + self.identity_api.create_user('fake1', user1) + self.identity_api.create_user('fake2', user2) + user2['name'] = 'fake1' + self.assertRaises(Exception, + self.identity_api.update_user, + 'fake2', + user2) + + def test_update_user_id_does_nothing(self): + user = {'id': 'fake1', + 'name': 'fake1', + 'password': 'fakepass', + 'tenants': ['bar',]} + self.identity_api.create_user('fake1', user) + user['id'] = 'fake2' + self.identity_api.update_user('fake1', user) + user_ref = self.identity_api.get_user('fake1') + self.assertEqual(user_ref['id'], 'fake1') + user_ref = self.identity_api.get_user('fake2') + self.assert_(user_ref is None) + + def test_create_duplicate_tenant_id_fails(self): + tenant = {'id': 'fake1', 'name': 'fake1'} + self.identity_api.create_tenant('fake1', tenant) + tenant['name'] = 'fake2' + self.assertRaises(Exception, + self.identity_api.create_tenant, + 'fake1', + tenant) + + def test_create_duplicate_tenant_name_fails(self): + tenant = {'id': 'fake1', 'name': 'fake'} + self.identity_api.create_tenant('fake1', tenant) + tenant['id'] = 'fake2' + self.assertRaises(Exception, + self.identity_api.create_tenant, + 'fake1', + tenant) + + def test_rename_duplicate_tenant_name_fails(self): + tenant1 = {'id': 'fake1', 'name': 'fake1'} + tenant2 = {'id': 'fake2', 'name': 'fake2'} + self.identity_api.create_tenant('fake1', tenant1) + self.identity_api.create_tenant('fake2', tenant2) + tenant2['name'] = 'fake1' + self.assertRaises(Exception, + self.identity_api.update_tenant, + 'fake2', + tenant2) + + def test_update_tenant_id_does_nothing(self): + tenant = {'id': 'fake1', 'name': 'fake1'} + self.identity_api.create_tenant('fake1', tenant) + tenant['id'] = 'fake2' + self.identity_api.update_tenant('fake1', tenant) + tenant_ref = self.identity_api.get_tenant('fake1') + self.assertEqual(tenant_ref['id'], 'fake1') + tenant_ref = self.identity_api.get_tenant('fake2') + self.assert_(tenant_ref is None)