Merge "Return 409 on creating/importing same name keypair"
This commit is contained in:
		| @@ -1066,6 +1066,67 @@ class DbApiTestCase(DbTestCase): | ||||
|         _compare(bw_usages[2], expected_bw_usages[2]) | ||||
|         timeutils.clear_time_override() | ||||
|  | ||||
|     def test_key_pair_create(self): | ||||
|         ctxt = context.get_admin_context() | ||||
|         values = {'name': 'test_keypair', 'public_key': 'test-public-key', | ||||
|                 'user_id': 'test_user_id', 'fingerprint': 'test_fingerprint'} | ||||
|         keypair = db.key_pair_create(ctxt, values) | ||||
|         self.assertNotEqual(None, keypair) | ||||
|         for name, value in values.iteritems(): | ||||
|             self.assertEqual(keypair.get(name), value) | ||||
|  | ||||
|     def test_key_pair_create_with_duplicate_name(self): | ||||
|         ctxt = context.get_admin_context() | ||||
|         values = {'name': 'test_keypair', 'public_key': 'test-public-key', | ||||
|                 'user_id': 'test_user_id', 'fingerprint': 'test_fingerprint'} | ||||
|         keypair = db.key_pair_create(ctxt, values) | ||||
|         self.assertRaises(exception.KeyPairExists, | ||||
|                           db.key_pair_create, ctxt, values) | ||||
|  | ||||
|     def test_admin_get_deleted_keypair(self): | ||||
|         # Test deleted keypair can be read by admin user. | ||||
|         ctxt = context.get_admin_context() | ||||
|         values = {'name': 'test_keypair', 'public_key': 'test-public-key', | ||||
|                  'user_id': 'test_user_id', 'fingerprint': 'test_fingerprint'} | ||||
|         keypair = db.key_pair_create(ctxt, values) | ||||
|         db.key_pair_destroy(ctxt, keypair['user_id'], keypair['name']) | ||||
|  | ||||
|         # Raise exception when read_deleted is 'no'. | ||||
|         self.assertRaises(exception.KeypairNotFound, db.key_pair_get, ctxt, | ||||
|                           keypair['user_id'], keypair['name']) | ||||
|         ctxt = ctxt.elevated(read_deleted='yes') | ||||
|         db_keypair = db.key_pair_get(ctxt, keypair['user_id'], | ||||
|                                      keypair['name']) | ||||
|         self.assertEqual(db_keypair['name'], keypair['name']) | ||||
|         self.assertEqual(db_keypair['deleted'], keypair['id']) | ||||
|  | ||||
|     def test_admin_get_all_keypairs_including_deleted(self): | ||||
|         # Test all deleted/non-deleted keypairs can be read by admin user. | ||||
|         ctxt = context.get_admin_context() | ||||
|         keypair1_values = {'name': 'test_keypair1', | ||||
|                            'public_key': 'test-public-key1', | ||||
|                            'user_id': 'test_user_id', | ||||
|                            'fingerprint': 'test_fingerprint1'} | ||||
|         keypair2_values = {'name': 'test_keypair2', | ||||
|                            'public_key': 'test-public-key2', | ||||
|                            'user_id': 'test_user_id', | ||||
|                            'fingerprint': 'test_fingerprint2'} | ||||
|         keypair1 = db.key_pair_create(ctxt, keypair1_values) | ||||
|         keypair2 = db.key_pair_create(ctxt, keypair2_values) | ||||
|         db.key_pair_destroy(ctxt, keypair1['user_id'], keypair1['name']) | ||||
|         db.key_pair_destroy(ctxt, keypair2['user_id'], keypair2['name']) | ||||
|         # Returns non-deleted keypairs. | ||||
|         result = db.key_pair_get_all_by_user(ctxt, keypair1['user_id']) | ||||
|         self.assertEqual(result, []) | ||||
|         ctxt = ctxt.elevated(read_deleted='yes') | ||||
|         # Returns deleted and non-deleted keypairs. | ||||
|         db_keypairs = db.key_pair_get_all_by_user(ctxt, keypair1['user_id']) | ||||
|         expected_deleted_ids = [keypair1['id'], keypair2['id']] | ||||
|         expected_keypair_names = [keypair1['name'], keypair2['name']] | ||||
|         for keypair in db_keypairs: | ||||
|             self.assertTrue(keypair['name'] in expected_keypair_names) | ||||
|             self.assertTrue(keypair['deleted'] in expected_deleted_ids) | ||||
|  | ||||
|  | ||||
| def _get_fake_aggr_values(): | ||||
|     return {'name': 'fake_aggregate'} | ||||
|   | ||||
| @@ -1151,6 +1151,45 @@ class TestNovaMigrations(BaseMigrationTestCase, CommonTestsMixIn): | ||||
|                     fetchall() | ||||
|         self.assertEqual(1, len(uc_name2_rows)) | ||||
|  | ||||
|     # migration 173, add unique constraint to keypairs | ||||
|     def _pre_upgrade_173(self, engine): | ||||
|         created_at = [datetime.datetime.now() for x in range(0, 7)] | ||||
|         fake_keypairs = [dict(name='key1', user_id='1a', | ||||
|                               created_at=created_at[0], | ||||
|                               deleted=0), | ||||
|                          dict(name='key1', user_id='1a', | ||||
|                               created_at=created_at[1], | ||||
|                               deleted=0), | ||||
|                          dict(name='key1', user_id='1a', | ||||
|                               created_at=created_at[2], | ||||
|                               deleted=0) | ||||
|                          ] | ||||
|         keypairs = get_table(engine, 'key_pairs') | ||||
|         engine.execute(keypairs.insert(), fake_keypairs) | ||||
|         return fake_keypairs | ||||
|  | ||||
|     def _check_173(self, engine, data): | ||||
|         keypairs = get_table(engine, 'key_pairs') | ||||
|         # Unique constraints are not listed in table.constraints for any db. | ||||
|         # So, simply add a duplicate keypair to check if unique constraint | ||||
|         # is applied to the key_pairs table or not. | ||||
|         insert = keypairs.insert() | ||||
|         duplicate_keypair = dict(name='key4', user_id='4a', | ||||
|                         created_at=datetime.datetime.now(), | ||||
|                         deleted=0) | ||||
|         insert.execute(duplicate_keypair) | ||||
|         # Insert again | ||||
|         self.assertRaises(sqlalchemy.exc.IntegrityError, insert.execute, | ||||
|                           duplicate_keypair) | ||||
|  | ||||
|         # Get all unique records | ||||
|         rows = keypairs.select().\ | ||||
|                      where(keypairs.c.deleted != keypairs.c.id).\ | ||||
|                      execute().\ | ||||
|                      fetchall() | ||||
|         # Ensure the number of unique keypairs is correct | ||||
|         self.assertEqual(len(rows), 2) | ||||
|  | ||||
|  | ||||
| class TestBaremetalMigrations(BaseMigrationTestCase, CommonTestsMixIn): | ||||
|     """Test sqlalchemy-migrate migrations.""" | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Jenkins
					Jenkins