Refactor unit tests for encryption utility and add assertions

Also, re-add the instance variables that were removed from
DBAPICryptParamsPropsTest in 4307f8722c
so the resource properties_data encryption is tested.

Change-Id: I7c720916511474b4f471a03235dddebf62dba085
This commit is contained in:
Jason Dunsmore 2016-03-30 14:41:41 -05:00
parent a56ec8be94
commit 72d3599b90
1 changed files with 136 additions and 135 deletions

View File

@ -2901,6 +2901,10 @@ class DBAPICryptParamsPropsTest(common.HeatTestCase):
def setUp(self):
super(DBAPICryptParamsPropsTest, self).setUp()
self.ctx = utils.dummy_context()
self.template = self._create_template()
self.user_creds = create_user_creds(self.ctx)
self.stack = create_stack(self.ctx, self.template, self.user_creds)
self.resources = [create_resource(self.ctx, self.stack, name='res1')]
def _create_template(self):
"""Initialize sample template."""
@ -2962,34 +2966,6 @@ class DBAPICryptParamsPropsTest(common.HeatTestCase):
return db_api.raw_template_create(self.ctx, template)
def _create_malformed_template(self):
"""Initialize a malformed template which should fail the encryption."""
t = template_format.parse('''
heat_template_version: 2013-05-23
parameters:
param1:
type: string
description: value1.
param2:
type: string
description: value2.
hidden: true
param3:
type: string
description: value3
hidden: true
default: "don't encrypt me! I'm not sensitive enough"
resources:
a_resource:
type: GenericResourceType
''')
template = {
'template': t,
'files': {'foo': 'bar'},
'environment': ''} # <- environment should be a dict
return db_api.raw_template_create(self.ctx, template)
def _test_db_encrypt_decrypt(self, batch_size=50):
session = db_api.get_session()
hidden_params_dict = {
@ -2999,126 +2975,102 @@ class DBAPICryptParamsPropsTest(common.HeatTestCase):
'param_map': '{\"test\":\"json\"}',
'param_comma_list': '[\"Hola\", \"Senor\"]'}
for r_tmpl in session.query(models.RawTemplate).all():
raw_templates = session.query(models.RawTemplate).all()
self.assertTrue(raw_templates)
for r_tmpl in raw_templates:
for param_name, param_value in hidden_params_dict.items():
self.assertEqual(param_value,
r_tmpl.environment['parameters'][param_name])
self.assertEqual('foo',
r_tmpl.environment['parameters']['param1'])
for resource in session.query(models.Resource).all():
resources = session.query(models.Resource).all()
self.assertTrue(resources)
for resource in resources:
self.assertEqual('bar1', resource.properties_data['foo1'])
# Test encryption
db_api.db_encrypt_parameters_and_properties(
self.ctx, cfg.CONF.auth_encryption_key, batch_size=batch_size)
session = db_api.get_session()
for enc_tmpl in session.query(models.RawTemplate).all():
for param_name in hidden_params_dict.keys():
self.assertEqual(
'cryptography_decrypt_v1',
enc_tmpl.environment['parameters'][param_name][0])
self.assertEqual('foo',
enc_tmpl.environment['parameters']['param1'])
self.assertIsNone(enc_tmpl.environment['parameters'].get('param3'))
def encrypt(enc_key=None):
if enc_key is None:
enc_key = cfg.CONF.auth_encryption_key
db_api.db_encrypt_parameters_and_properties(
self.ctx, enc_key, batch_size=batch_size)
session = db_api.get_session()
enc_raw_templates = session.query(models.RawTemplate).all()
self.assertTrue(enc_raw_templates)
for enc_tmpl in enc_raw_templates:
for param_name in hidden_params_dict.keys():
self.assertEqual(
'cryptography_decrypt_v1',
enc_tmpl.environment['parameters'][param_name][0])
self.assertEqual(
'foo', enc_tmpl.environment['parameters']['param1'])
# test that default parameters are not encrypted
self.assertIsNone(
enc_tmpl.environment['parameters'].get('param3'))
encrypt_value = enc_tmpl.environment['parameters']['param2'][1]
for enc_prop in session.query(models.Resource).all():
self.assertEqual('cryptography_decrypt_v1',
enc_prop.properties_data['foo1'][0])
encrypt_value = enc_tmpl.environment['parameters']['param2'][1]
enc_resources = session.query(models.Resource).all()
self.assertTrue(enc_resources)
for enc_prop in enc_resources:
self.assertEqual('cryptography_decrypt_v1',
enc_prop.properties_data['foo1'][0])
return encrypt_value
# Test encryption
encrypt_value = encrypt()
# Test that encryption is idempotent
db_api.db_encrypt_parameters_and_properties(
self.ctx, cfg.CONF.auth_encryption_key, batch_size=batch_size)
session = db_api.get_session()
for enc_tmpl in session.query(models.RawTemplate).all():
for param_name in hidden_params_dict.keys():
self.assertEqual(
'cryptography_decrypt_v1',
enc_tmpl.environment['parameters'][param_name][0])
self.assertEqual('foo',
enc_tmpl.environment['parameters']['param1'])
self.assertIsNone(
enc_tmpl.environment['parameters'].get('param3'))
for enc_prop in session.query(models.Resource).all():
self.assertEqual('cryptography_decrypt_v1',
enc_prop.properties_data['foo1'][0])
encrypt_value2 = encrypt()
self.assertEqual(encrypt_value, encrypt_value2)
def decrypt(enc_key=None):
if enc_key is None:
enc_key = cfg.CONF.auth_encryption_key
db_api.db_decrypt_parameters_and_properties(
self.ctx, enc_key, batch_size=batch_size)
session = db_api.get_session()
dec_templates = session.query(models.RawTemplate).all()
self.assertTrue(dec_templates)
for dec_tmpl in dec_templates:
self.assertNotEqual(
encrypt_value,
r_tmpl.environment['parameters']['param2'][1])
# test that default parameters are not encrypted
self.assertIsNone(r_tmpl.environment['parameters'].get(
'param3'))
for param_name, param_value in hidden_params_dict.items():
self.assertEqual(
param_value,
dec_tmpl.environment['parameters'][param_name])
self.assertEqual(
'foo', dec_tmpl.environment['parameters']['param1'])
self.assertIsNone(
dec_tmpl.environment['parameters'].get('param3'))
# test that decryption does store default
# parameter values in raw_template.environment
self.assertIsNone(dec_tmpl.environment['parameters'].get(
'param3'))
decrypt_value = dec_tmpl.environment['parameters']['param2'][1]
dec_resources = session.query(models.Resource).all()
self.assertTrue(dec_resources)
for dec_prop in dec_resources:
self.assertEqual('bar1', dec_prop.properties_data['foo1'])
return decrypt_value
# Test decryption
db_api.db_decrypt_parameters_and_properties(
self.ctx, cfg.CONF.auth_encryption_key, batch_size=batch_size)
session = db_api.get_session()
for dec_tmpl in session.query(models.RawTemplate).all():
for param_name, param_value in hidden_params_dict.items():
self.assertEqual(
param_value,
dec_tmpl.environment['parameters'][param_name])
self.assertEqual('foo',
dec_tmpl.environment['parameters']['param1'])
self.assertIsNone(
dec_tmpl.environment['parameters'].get('param3'))
for dec_prop in session.query(models.Resource).all():
self.assertEqual('bar1', dec_prop.properties_data['foo1'])
decrypt_value = decrypt()
# Test that decryption is idempotent
db_api.db_decrypt_parameters_and_properties(
self.ctx, cfg.CONF.auth_encryption_key, batch_size=batch_size)
session = db_api.get_session()
for dec_tmpl in session.query(models.RawTemplate).all():
for param_name, param_value in hidden_params_dict.items():
self.assertEqual(
param_value,
dec_tmpl.environment['parameters'][param_name])
self.assertEqual('foo',
dec_tmpl.environment['parameters']['param1'])
self.assertIsNone(
dec_tmpl.environment['parameters'].get('param3'))
for dec_prop in session.query(models.Resource).all():
self.assertEqual('bar1', dec_prop.properties_data['foo1'])
decrypt_value2 = decrypt()
# Test using a different encryption key to decrypt
db_api.db_encrypt_parameters_and_properties(
self.ctx, '774c15be099ea74123a9b9592ff12680',
batch_size=batch_size)
session = db_api.get_session()
for r_tmpl in session.query(models.RawTemplate).all():
self.assertNotEqual(encrypt_value,
r_tmpl.environment['parameters']['param2'][1])
# test that default parameters are not encrypted
self.assertIsNone(r_tmpl.environment['parameters'].get('param3'))
db_api.db_decrypt_parameters_and_properties(
self.ctx, '774c15be099ea74123a9b9592ff12680',
batch_size=batch_size)
session = db_api.get_session()
for r_tmpl in session.query(models.RawTemplate).all():
self.assertEqual('bar',
r_tmpl.environment['parameters']['param2'])
# test that decryption does store default parameter values in
# raw_template.environment
self.assertIsNone(r_tmpl.environment['parameters'].get('param3'))
def _test_db_encrypt_decrypt_malformed(self, batch_size=50):
session = db_api.get_session()
r_tmpls = session.query(models.RawTemplate).all()
self.assertEqual('', r_tmpls[0].environment)
# Test encryption
db_api.db_encrypt_parameters_and_properties(
self.ctx, cfg.CONF.auth_encryption_key, batch_size=batch_size)
session = db_api.get_session()
enc_tmpls = session.query(models.RawTemplate).all()
self.assertEqual('', enc_tmpls[0].environment)
self.assertEqual('cryptography_decrypt_v1',
enc_tmpls[1].environment['parameters']['param2'][0])
# Test decryption
db_api.db_decrypt_parameters_and_properties(
self.ctx, cfg.CONF.auth_encryption_key, batch_size=batch_size)
session = db_api.get_session()
dec_tmpls = session.query(models.RawTemplate).all()
self.assertEqual('', dec_tmpls[0].environment)
self.assertEqual('bar',
dec_tmpls[1].environment['parameters']['param2'])
# Test using a different encryption key to encrypt & decrypt
encrypt_value3 = encrypt(enc_key='774c15be099ea74123a9b9592ff12680')
decrypt_value3 = decrypt(enc_key='774c15be099ea74123a9b9592ff12680')
self.assertEqual(decrypt_value, decrypt_value2, decrypt_value3)
self.assertNotEqual(encrypt_value, decrypt_value)
self.assertNotEqual(encrypt_value3, decrypt_value3)
self.assertNotEqual(encrypt_value, encrypt_value3)
def _delete_templates(self, template_refs):
for tmpl_ref in template_refs:
@ -3143,7 +3095,56 @@ class DBAPICryptParamsPropsTest(common.HeatTestCase):
def test_db_encrypt_decrypt_exception_continue(self):
"""Test that encryption and decryption proceed after an exception"""
tmpl1 = self._create_malformed_template()
def create_malformed_template():
"""Initialize a malformed template which should fail encryption."""
t = template_format.parse('''
heat_template_version: 2013-05-23
parameters:
param1:
type: string
description: value1.
param2:
type: string
description: value2.
hidden: true
param3:
type: string
description: value3
hidden: true
default: "don't encrypt me! I'm not sensitive enough"
resources:
a_resource:
type: GenericResourceType
''')
template = {
'template': t,
'files': {'foo': 'bar'},
'environment': ''} # <- environment should be a dict
return db_api.raw_template_create(self.ctx, template)
tmpl1 = create_malformed_template()
tmpl2 = self._create_template()
self.addCleanup(self._delete_templates, [tmpl1, tmpl2])
self._test_db_encrypt_decrypt_malformed()
session = db_api.get_session()
r_tmpls = session.query(models.RawTemplate).all()
self.assertEqual('', r_tmpls[1].environment)
# Test encryption
db_api.db_encrypt_parameters_and_properties(
self.ctx, cfg.CONF.auth_encryption_key, batch_size=50)
session = db_api.get_session()
enc_tmpls = session.query(models.RawTemplate).all()
self.assertEqual('', enc_tmpls[1].environment)
self.assertEqual('cryptography_decrypt_v1',
enc_tmpls[2].environment['parameters']['param2'][0])
# Test decryption
db_api.db_decrypt_parameters_and_properties(
self.ctx, cfg.CONF.auth_encryption_key, batch_size=50)
session = db_api.get_session()
dec_tmpls = session.query(models.RawTemplate).all()
self.assertEqual('', dec_tmpls[1].environment)
self.assertEqual('bar',
dec_tmpls[2].environment['parameters']['param2'])