Return 409 on creating/importing same name keypair
* Removed the DB look up on creating and importing a keypair. * Added unique constraint on user_id and name columns of key_pairs table. Fixes LP: #1086333. Change-Id: I2fd4697f9269d2be13bd977e65ba4ca4a27b9ac6
This commit is contained in:
@@ -1066,6 +1066,67 @@ class DbApiTestCase(DbTestCase):
|
||||
_compare(bw_usages[2], expected_bw_usages[2])
|
||||
timeutils.clear_time_override()
|
||||
|
||||
def test_key_pair_create(self):
|
||||
ctxt = context.get_admin_context()
|
||||
values = {'name': 'test_keypair', 'public_key': 'test-public-key',
|
||||
'user_id': 'test_user_id', 'fingerprint': 'test_fingerprint'}
|
||||
keypair = db.key_pair_create(ctxt, values)
|
||||
self.assertNotEqual(None, keypair)
|
||||
for name, value in values.iteritems():
|
||||
self.assertEqual(keypair.get(name), value)
|
||||
|
||||
def test_key_pair_create_with_duplicate_name(self):
|
||||
ctxt = context.get_admin_context()
|
||||
values = {'name': 'test_keypair', 'public_key': 'test-public-key',
|
||||
'user_id': 'test_user_id', 'fingerprint': 'test_fingerprint'}
|
||||
keypair = db.key_pair_create(ctxt, values)
|
||||
self.assertRaises(exception.KeyPairExists,
|
||||
db.key_pair_create, ctxt, values)
|
||||
|
||||
def test_admin_get_deleted_keypair(self):
|
||||
# Test deleted keypair can be read by admin user.
|
||||
ctxt = context.get_admin_context()
|
||||
values = {'name': 'test_keypair', 'public_key': 'test-public-key',
|
||||
'user_id': 'test_user_id', 'fingerprint': 'test_fingerprint'}
|
||||
keypair = db.key_pair_create(ctxt, values)
|
||||
db.key_pair_destroy(ctxt, keypair['user_id'], keypair['name'])
|
||||
|
||||
# Raise exception when read_deleted is 'no'.
|
||||
self.assertRaises(exception.KeypairNotFound, db.key_pair_get, ctxt,
|
||||
keypair['user_id'], keypair['name'])
|
||||
ctxt = ctxt.elevated(read_deleted='yes')
|
||||
db_keypair = db.key_pair_get(ctxt, keypair['user_id'],
|
||||
keypair['name'])
|
||||
self.assertEqual(db_keypair['name'], keypair['name'])
|
||||
self.assertEqual(db_keypair['deleted'], keypair['id'])
|
||||
|
||||
def test_admin_get_all_keypairs_including_deleted(self):
|
||||
# Test all deleted/non-deleted keypairs can be read by admin user.
|
||||
ctxt = context.get_admin_context()
|
||||
keypair1_values = {'name': 'test_keypair1',
|
||||
'public_key': 'test-public-key1',
|
||||
'user_id': 'test_user_id',
|
||||
'fingerprint': 'test_fingerprint1'}
|
||||
keypair2_values = {'name': 'test_keypair2',
|
||||
'public_key': 'test-public-key2',
|
||||
'user_id': 'test_user_id',
|
||||
'fingerprint': 'test_fingerprint2'}
|
||||
keypair1 = db.key_pair_create(ctxt, keypair1_values)
|
||||
keypair2 = db.key_pair_create(ctxt, keypair2_values)
|
||||
db.key_pair_destroy(ctxt, keypair1['user_id'], keypair1['name'])
|
||||
db.key_pair_destroy(ctxt, keypair2['user_id'], keypair2['name'])
|
||||
# Returns non-deleted keypairs.
|
||||
result = db.key_pair_get_all_by_user(ctxt, keypair1['user_id'])
|
||||
self.assertEqual(result, [])
|
||||
ctxt = ctxt.elevated(read_deleted='yes')
|
||||
# Returns deleted and non-deleted keypairs.
|
||||
db_keypairs = db.key_pair_get_all_by_user(ctxt, keypair1['user_id'])
|
||||
expected_deleted_ids = [keypair1['id'], keypair2['id']]
|
||||
expected_keypair_names = [keypair1['name'], keypair2['name']]
|
||||
for keypair in db_keypairs:
|
||||
self.assertTrue(keypair['name'] in expected_keypair_names)
|
||||
self.assertTrue(keypair['deleted'] in expected_deleted_ids)
|
||||
|
||||
|
||||
def _get_fake_aggr_values():
|
||||
return {'name': 'fake_aggregate'}
|
||||
|
||||
@@ -1151,6 +1151,45 @@ class TestNovaMigrations(BaseMigrationTestCase, CommonTestsMixIn):
|
||||
fetchall()
|
||||
self.assertEqual(1, len(uc_name2_rows))
|
||||
|
||||
# migration 173, add unique constraint to keypairs
|
||||
def _pre_upgrade_173(self, engine):
|
||||
created_at = [datetime.datetime.now() for x in range(0, 7)]
|
||||
fake_keypairs = [dict(name='key1', user_id='1a',
|
||||
created_at=created_at[0],
|
||||
deleted=0),
|
||||
dict(name='key1', user_id='1a',
|
||||
created_at=created_at[1],
|
||||
deleted=0),
|
||||
dict(name='key1', user_id='1a',
|
||||
created_at=created_at[2],
|
||||
deleted=0)
|
||||
]
|
||||
keypairs = get_table(engine, 'key_pairs')
|
||||
engine.execute(keypairs.insert(), fake_keypairs)
|
||||
return fake_keypairs
|
||||
|
||||
def _check_173(self, engine, data):
|
||||
keypairs = get_table(engine, 'key_pairs')
|
||||
# Unique constraints are not listed in table.constraints for any db.
|
||||
# So, simply add a duplicate keypair to check if unique constraint
|
||||
# is applied to the key_pairs table or not.
|
||||
insert = keypairs.insert()
|
||||
duplicate_keypair = dict(name='key4', user_id='4a',
|
||||
created_at=datetime.datetime.now(),
|
||||
deleted=0)
|
||||
insert.execute(duplicate_keypair)
|
||||
# Insert again
|
||||
self.assertRaises(sqlalchemy.exc.IntegrityError, insert.execute,
|
||||
duplicate_keypair)
|
||||
|
||||
# Get all unique records
|
||||
rows = keypairs.select().\
|
||||
where(keypairs.c.deleted != keypairs.c.id).\
|
||||
execute().\
|
||||
fetchall()
|
||||
# Ensure the number of unique keypairs is correct
|
||||
self.assertEqual(len(rows), 2)
|
||||
|
||||
|
||||
class TestBaremetalMigrations(BaseMigrationTestCase, CommonTestsMixIn):
|
||||
"""Test sqlalchemy-migrate migrations."""
|
||||
|
||||
Reference in New Issue
Block a user