Minor reorg of sqlalchemy encrypt test functions

Move the encrypt and decrypt inner functions to the top of the method to
make it obvious that they don't rely on method scoped variables
(except hidden_params_dict). Pass encrypt_value as an argument to decrypt
for its assertions.

Change-Id: I8f1374a91da83d8d1b86c9d75c4492319051da06
Related-Bug: #1479723
This commit is contained in:
Steve Baker 2016-06-20 11:20:06 +12:00
parent 9e007acc44
commit 750d4e4df9
1 changed files with 37 additions and 44 deletions

View File

@ -3147,8 +3147,7 @@ class DBAPICryptParamsPropsTest(common.HeatTestCase):
return db_api.raw_template_create(self.ctx, template)
def _test_db_encrypt_decrypt(self, batch_size=50):
session = self.ctx.session
def _test_db_encrypt_decrypt(self, template_ids, batch_size=50):
hidden_params_dict = {
'param2': 'bar',
'param_number': '456',
@ -3156,28 +3155,13 @@ class DBAPICryptParamsPropsTest(common.HeatTestCase):
'param_map': '{\"test\":\"json\"}',
'param_comma_list': '[\"Hola\", \"Senor\"]'}
raw_templates = session.query(models.RawTemplate).all()
self.assertNotEqual([], raw_templates)
for r_tmpl in raw_templates:
for param_name, param_value in hidden_params_dict.items():
self.assertEqual(param_value,
r_tmpl.environment['parameters'][param_name])
self.assertEqual('foo',
r_tmpl.environment['parameters']['param1'])
resources = session.query(models.Resource).all()
self.assertNotEqual([], resources)
for resource in resources:
self.assertEqual('bar1', resource.properties_data['foo1'])
def encrypt(enc_key=None):
if enc_key is None:
enc_key = cfg.CONF.auth_encryption_key
self.assertEqual([], db_api.db_encrypt_parameters_and_properties(
self.ctx, enc_key, batch_size=batch_size))
session = self.ctx.session
enc_raw_templates = session.query(models.RawTemplate).all()
self.assertNotEqual([], enc_raw_templates)
for enc_tmpl in enc_raw_templates:
for template_id in template_ids:
enc_tmpl = db_api.raw_template_get(self.ctx, template_id)
for param_name in hidden_params_dict.keys():
self.assertEqual(
'cryptography_decrypt_v1',
@ -3188,30 +3172,20 @@ class DBAPICryptParamsPropsTest(common.HeatTestCase):
self.assertIsNone(
enc_tmpl.environment['parameters'].get('param3'))
encrypt_value = enc_tmpl.environment['parameters']['param2'][1]
enc_resources = session.query(models.Resource).all()
self.assertNotEqual([], enc_resources)
for enc_prop in enc_resources:
ev = enc_tmpl.environment['parameters']['param2'][1]
for resource in self.resources:
enc_prop = db_api.resource_get(self.ctx, resource.id)
self.assertEqual('cryptography_decrypt_v1',
enc_prop.properties_data['foo1'][0])
return encrypt_value
return ev
# Test encryption
encrypt_value = encrypt()
# Test that encryption is idempotent
encrypt_value2 = encrypt()
self.assertEqual(encrypt_value, encrypt_value2)
def decrypt(enc_key=None):
def decrypt(encrypt_value, enc_key=None):
if enc_key is None:
enc_key = cfg.CONF.auth_encryption_key
self.assertEqual([], db_api.db_decrypt_parameters_and_properties(
self.ctx, enc_key, batch_size=batch_size))
session = self.ctx.session
dec_templates = session.query(models.RawTemplate).all()
self.assertNotEqual([], dec_templates)
for dec_tmpl in dec_templates:
for template_id in template_ids:
dec_tmpl = db_api.raw_template_get(self.ctx, template_id)
self.assertNotEqual(
encrypt_value,
r_tmpl.environment['parameters']['param2'][1])
@ -3233,21 +3207,39 @@ class DBAPICryptParamsPropsTest(common.HeatTestCase):
'param3'))
decrypt_value = dec_tmpl.environment['parameters']['param2'][1]
dec_resources = session.query(models.Resource).all()
self.assertNotEqual([], dec_resources)
for dec_prop in dec_resources:
for resource in self.resources:
dec_prop = db_api.resource_get(self.ctx, resource.id)
self.assertEqual('bar1', dec_prop.properties_data['foo1'])
return decrypt_value
for template_id in template_ids:
r_tmpl = db_api.raw_template_get(self.ctx, template_id)
for param_name, param_value in hidden_params_dict.items():
self.assertEqual(param_value,
r_tmpl.environment['parameters'][param_name])
self.assertEqual('foo',
r_tmpl.environment['parameters']['param1'])
for resource in self.resources:
resource = db_api.resource_get(self.ctx, resource.id)
self.assertEqual('bar1', resource.properties_data['foo1'])
# Test encryption
encrypt_value = encrypt()
# Test that encryption is idempotent
encrypt_value2 = encrypt()
self.assertEqual(encrypt_value, encrypt_value2)
# Test decryption
decrypt_value = decrypt()
decrypt_value = decrypt(encrypt_value)
# Test that decryption is idempotent
decrypt_value2 = decrypt()
decrypt_value2 = decrypt(encrypt_value)
# Test using a different encryption key to encrypt & decrypt
encrypt_value3 = encrypt(enc_key='774c15be099ea74123a9b9592ff12680')
decrypt_value3 = decrypt(enc_key='774c15be099ea74123a9b9592ff12680')
decrypt_value3 = decrypt(encrypt_value3,
enc_key='774c15be099ea74123a9b9592ff12680')
self.assertEqual(decrypt_value, decrypt_value2, decrypt_value3)
self.assertNotEqual(encrypt_value, decrypt_value)
self.assertNotEqual(encrypt_value3, decrypt_value3)
@ -3261,7 +3253,7 @@ class DBAPICryptParamsPropsTest(common.HeatTestCase):
"""Test encryption and decryption for single template"""
tmpl = self._create_template()
self.addCleanup(self._delete_templates, [tmpl])
self._test_db_encrypt_decrypt()
self._test_db_encrypt_decrypt([tmpl.id])
def test_db_encrypt_decrypt_in_batches(self):
"""Test encryption and decryption in for several templates.
@ -3272,7 +3264,8 @@ class DBAPICryptParamsPropsTest(common.HeatTestCase):
tmpl1 = self._create_template()
tmpl2 = self._create_template()
self.addCleanup(self._delete_templates, [tmpl1, tmpl2])
self._test_db_encrypt_decrypt(batch_size=1)
template_ids = [tmpl1.id, tmpl2.id]
self._test_db_encrypt_decrypt(template_ids, batch_size=1)
def test_db_encrypt_decrypt_exception_continue(self):
"""Test that encryption and decryption proceed after an exception"""