diff --git a/nova/tests/test_db_api.py b/nova/tests/test_db_api.py index ce4c8fe7..23b8ebcb 100644 --- a/nova/tests/test_db_api.py +++ b/nova/tests/test_db_api.py @@ -1066,6 +1066,67 @@ class DbApiTestCase(DbTestCase): _compare(bw_usages[2], expected_bw_usages[2]) timeutils.clear_time_override() + def test_key_pair_create(self): + ctxt = context.get_admin_context() + values = {'name': 'test_keypair', 'public_key': 'test-public-key', + 'user_id': 'test_user_id', 'fingerprint': 'test_fingerprint'} + keypair = db.key_pair_create(ctxt, values) + self.assertNotEqual(None, keypair) + for name, value in values.iteritems(): + self.assertEqual(keypair.get(name), value) + + def test_key_pair_create_with_duplicate_name(self): + ctxt = context.get_admin_context() + values = {'name': 'test_keypair', 'public_key': 'test-public-key', + 'user_id': 'test_user_id', 'fingerprint': 'test_fingerprint'} + keypair = db.key_pair_create(ctxt, values) + self.assertRaises(exception.KeyPairExists, + db.key_pair_create, ctxt, values) + + def test_admin_get_deleted_keypair(self): + # Test deleted keypair can be read by admin user. + ctxt = context.get_admin_context() + values = {'name': 'test_keypair', 'public_key': 'test-public-key', + 'user_id': 'test_user_id', 'fingerprint': 'test_fingerprint'} + keypair = db.key_pair_create(ctxt, values) + db.key_pair_destroy(ctxt, keypair['user_id'], keypair['name']) + + # Raise exception when read_deleted is 'no'. + self.assertRaises(exception.KeypairNotFound, db.key_pair_get, ctxt, + keypair['user_id'], keypair['name']) + ctxt = ctxt.elevated(read_deleted='yes') + db_keypair = db.key_pair_get(ctxt, keypair['user_id'], + keypair['name']) + self.assertEqual(db_keypair['name'], keypair['name']) + self.assertEqual(db_keypair['deleted'], keypair['id']) + + def test_admin_get_all_keypairs_including_deleted(self): + # Test all deleted/non-deleted keypairs can be read by admin user. + ctxt = context.get_admin_context() + keypair1_values = {'name': 'test_keypair1', + 'public_key': 'test-public-key1', + 'user_id': 'test_user_id', + 'fingerprint': 'test_fingerprint1'} + keypair2_values = {'name': 'test_keypair2', + 'public_key': 'test-public-key2', + 'user_id': 'test_user_id', + 'fingerprint': 'test_fingerprint2'} + keypair1 = db.key_pair_create(ctxt, keypair1_values) + keypair2 = db.key_pair_create(ctxt, keypair2_values) + db.key_pair_destroy(ctxt, keypair1['user_id'], keypair1['name']) + db.key_pair_destroy(ctxt, keypair2['user_id'], keypair2['name']) + # Returns non-deleted keypairs. + result = db.key_pair_get_all_by_user(ctxt, keypair1['user_id']) + self.assertEqual(result, []) + ctxt = ctxt.elevated(read_deleted='yes') + # Returns deleted and non-deleted keypairs. + db_keypairs = db.key_pair_get_all_by_user(ctxt, keypair1['user_id']) + expected_deleted_ids = [keypair1['id'], keypair2['id']] + expected_keypair_names = [keypair1['name'], keypair2['name']] + for keypair in db_keypairs: + self.assertTrue(keypair['name'] in expected_keypair_names) + self.assertTrue(keypair['deleted'] in expected_deleted_ids) + def _get_fake_aggr_values(): return {'name': 'fake_aggregate'} diff --git a/nova/tests/test_migrations.py b/nova/tests/test_migrations.py index 98c50aaa..c78bb6d2 100644 --- a/nova/tests/test_migrations.py +++ b/nova/tests/test_migrations.py @@ -1151,6 +1151,45 @@ class TestNovaMigrations(BaseMigrationTestCase, CommonTestsMixIn): fetchall() self.assertEqual(1, len(uc_name2_rows)) + # migration 173, add unique constraint to keypairs + def _pre_upgrade_173(self, engine): + created_at = [datetime.datetime.now() for x in range(0, 7)] + fake_keypairs = [dict(name='key1', user_id='1a', + created_at=created_at[0], + deleted=0), + dict(name='key1', user_id='1a', + created_at=created_at[1], + deleted=0), + dict(name='key1', user_id='1a', + created_at=created_at[2], + deleted=0) + ] + keypairs = get_table(engine, 'key_pairs') + engine.execute(keypairs.insert(), fake_keypairs) + return fake_keypairs + + def _check_173(self, engine, data): + keypairs = get_table(engine, 'key_pairs') + # Unique constraints are not listed in table.constraints for any db. + # So, simply add a duplicate keypair to check if unique constraint + # is applied to the key_pairs table or not. + insert = keypairs.insert() + duplicate_keypair = dict(name='key4', user_id='4a', + created_at=datetime.datetime.now(), + deleted=0) + insert.execute(duplicate_keypair) + # Insert again + self.assertRaises(sqlalchemy.exc.IntegrityError, insert.execute, + duplicate_keypair) + + # Get all unique records + rows = keypairs.select().\ + where(keypairs.c.deleted != keypairs.c.id).\ + execute().\ + fetchall() + # Ensure the number of unique keypairs is correct + self.assertEqual(len(rows), 2) + class TestBaremetalMigrations(BaseMigrationTestCase, CommonTestsMixIn): """Test sqlalchemy-migrate migrations."""