Minor encrypt/decrypt unittest refactor

Move encrypt/decrypt functions out of
DBAPICryptParamsPropsTest._test_db_encrypt_decrypt(). This makes it
less complex in the eyes of pep8, which will benefit a future commit.

Also simplify: do not need to pass around template_ids.

Change-Id: I772a3cefc6cccf06990409d7ff8aa3db64f21e6b
This commit is contained in:
Crag Wolfe 2016-08-28 16:57:41 -04:00
parent 882a640f18
commit 0925692ecd
1 changed files with 103 additions and 95 deletions

View File

@ -33,6 +33,7 @@ from heat.common import context
from heat.common import exception
from heat.common import template_format
from heat.db.sqlalchemy import api as db_api
from heat.db.sqlalchemy import models
from heat.engine.clients.os import glance
from heat.engine.clients.os import nova
from heat.engine import environment
@ -3217,6 +3218,13 @@ class DBAPICryptParamsPropsTest(common.HeatTestCase):
self.stack = create_stack(self.ctx, self.template, self.user_creds)
self.resources = [create_resource(self.ctx, self.stack, name='res1')]
hidden_params_dict = {
'param2': 'bar',
'param_number': '456',
'param_boolean': '1',
'param_map': '{\"test\":\"json\"}',
'param_comma_list': '[\"Hola\", \"Senor\"]'}
def _create_template(self):
"""Initialize sample template."""
self.t = template_format.parse('''
@ -3277,113 +3285,116 @@ class DBAPICryptParamsPropsTest(common.HeatTestCase):
return db_api.raw_template_create(self.ctx, template)
def _test_db_encrypt_decrypt(self, template_ids, batch_size=50):
hidden_params_dict = {
'param2': 'bar',
'param_number': '456',
'param_boolean': '1',
'param_map': '{\"test\":\"json\"}',
'param_comma_list': '[\"Hola\", \"Senor\"]'}
def encrypt(self, enc_key=None, batch_size=50):
session = self.ctx.session
if enc_key is None:
enc_key = cfg.CONF.auth_encryption_key
self.assertEqual([], db_api.db_encrypt_parameters_and_properties(
self.ctx, enc_key, batch_size=batch_size))
for enc_tmpl in session.query(models.RawTemplate).all():
for param_name in self.hidden_params_dict.keys():
self.assertEqual(
'cryptography_decrypt_v1',
enc_tmpl.environment['parameters'][param_name][0])
self.assertEqual(
'foo', enc_tmpl.environment['parameters']['param1'])
# test that decryption does not store (or encrypt) default
# values in template's environment['parameters']
self.assertIsNone(
enc_tmpl.environment['parameters'].get('param3'))
def encrypt(enc_key=None):
if enc_key is None:
enc_key = cfg.CONF.auth_encryption_key
self.assertEqual([], db_api.db_encrypt_parameters_and_properties(
self.ctx, enc_key, batch_size=batch_size))
for template_id in template_ids:
enc_tmpl = db_api.raw_template_get(self.ctx, template_id)
for param_name in hidden_params_dict.keys():
self.assertEqual(
'cryptography_decrypt_v1',
enc_tmpl.environment['parameters'][param_name][0])
self.assertEqual(
'foo', enc_tmpl.environment['parameters']['param1'])
# test that default parameters are not encrypted
self.assertIsNone(
enc_tmpl.environment['parameters'].get('param3'))
enc_resources = session.query(models.Resource).all()
self.assertNotEqual([], enc_resources)
for enc_resource in enc_resources:
self.assertEqual('cryptography_decrypt_v1',
enc_resource.properties_data['foo1'][0])
ev = enc_tmpl.environment['parameters']['param2'][1]
for resource in self.resources:
enc_prop = db_api.resource_get(self.ctx, resource.id)
self.assertEqual('cryptography_decrypt_v1',
enc_prop.properties_data['foo1'][0])
return ev
ev = enc_tmpl.environment['parameters']['param2'][1]
return ev
def decrypt(encrypt_value, enc_key=None):
if enc_key is None:
enc_key = cfg.CONF.auth_encryption_key
self.assertEqual([], db_api.db_decrypt_parameters_and_properties(
self.ctx, enc_key, batch_size=batch_size))
for template_id in template_ids:
dec_tmpl = db_api.raw_template_get(self.ctx, template_id)
self.assertNotEqual(
encrypt_value,
r_tmpl.environment['parameters']['param2'][1])
# test that default parameters are not encrypted
self.assertIsNone(r_tmpl.environment['parameters'].get(
'param3'))
for param_name, param_value in hidden_params_dict.items():
self.assertEqual(
param_value,
dec_tmpl.environment['parameters'][param_name])
self.assertEqual(
'foo', dec_tmpl.environment['parameters']['param1'])
self.assertIsNone(
dec_tmpl.environment['parameters'].get('param3'))
def decrypt(self, encrypt_value, enc_key=None,
batch_size=50):
session = self.ctx.session
if enc_key is None:
enc_key = cfg.CONF.auth_encryption_key
# test that decryption does store default
# parameter values in raw_template.environment
self.assertIsNone(dec_tmpl.environment['parameters'].get(
'param3'))
self.assertEqual([], db_api.db_decrypt_parameters_and_properties(
self.ctx, enc_key, batch_size=batch_size))
decrypt_value = dec_tmpl.environment['parameters']['param2'][1]
for resource in self.resources:
dec_prop = db_api.resource_get(self.ctx, resource.id)
self.assertEqual('bar1', dec_prop.properties_data['foo1'])
return decrypt_value
for dec_tmpl in session.query(models.RawTemplate).all():
self.assertNotEqual(
encrypt_value,
dec_tmpl.environment['parameters']['param2'][1])
for param_name, param_value in self.hidden_params_dict.items():
self.assertEqual(
param_value,
dec_tmpl.environment['parameters'][param_name])
self.assertEqual(
'foo', dec_tmpl.environment['parameters']['param1'])
self.assertIsNone(
dec_tmpl.environment['parameters'].get('param3'))
for template_id in template_ids:
r_tmpl = db_api.raw_template_get(self.ctx, template_id)
for param_name, param_value in hidden_params_dict.items():
# test that decryption does not store default
# values in template's environment['parameters']
self.assertIsNone(dec_tmpl.environment['parameters'].get(
'param3'))
decrypt_value = dec_tmpl.environment['parameters']['param2'][1]
dec_resources = session.query(models.Resource).all()
self.assertNotEqual([], dec_resources)
for dec_resource in dec_resources:
self.assertEqual('bar1', dec_resource.properties_data['foo1'])
return decrypt_value
def _test_db_encrypt_decrypt(self, batch_size=50):
session = self.ctx.session
raw_templates = session.query(models.RawTemplate).all()
self.assertNotEqual([], raw_templates)
for r_tmpl in raw_templates:
for param_name, param_value in self.hidden_params_dict.items():
self.assertEqual(param_value,
r_tmpl.environment['parameters'][param_name])
self.assertEqual('foo',
r_tmpl.environment['parameters']['param1'])
for resource in self.resources:
resources = session.query(models.Resource).all()
self.assertNotEqual([], resources)
self.assertEqual(len(resources), len(raw_templates))
for resource in resources:
resource = db_api.resource_get(self.ctx, resource.id)
self.assertEqual('bar1', resource.properties_data['foo1'])
# Test encryption
encrypt_value = encrypt()
encrypt_value = self.encrypt(batch_size=batch_size)
# Test that encryption is idempotent
encrypt_value2 = encrypt()
encrypt_value2 = self.encrypt(batch_size=batch_size)
self.assertEqual(encrypt_value, encrypt_value2)
# Test decryption
decrypt_value = decrypt(encrypt_value)
decrypt_value = self.decrypt(encrypt_value, batch_size=batch_size)
# Test that decryption is idempotent
decrypt_value2 = decrypt(encrypt_value)
decrypt_value2 = self.decrypt(encrypt_value, batch_size=batch_size)
self.assertEqual(decrypt_value, decrypt_value2)
# Test using a different encryption key to encrypt & decrypt
encrypt_value3 = encrypt(enc_key='774c15be099ea74123a9b9592ff12680')
decrypt_value3 = decrypt(encrypt_value3,
enc_key='774c15be099ea74123a9b9592ff12680')
self.assertEqual(decrypt_value, decrypt_value2, decrypt_value3)
encrypt_value3 = self.encrypt(
enc_key='774c15be099ea74123a9b9592ff12680',
batch_size=batch_size)
decrypt_value3 = self.decrypt(
encrypt_value3, enc_key='774c15be099ea74123a9b9592ff12680',
batch_size=batch_size)
self.assertEqual(decrypt_value, decrypt_value3)
self.assertNotEqual(encrypt_value, decrypt_value)
self.assertNotEqual(encrypt_value3, decrypt_value3)
self.assertNotEqual(encrypt_value, encrypt_value3)
def _delete_templates(self, template_refs):
for tmpl_ref in template_refs:
db_api.raw_template_delete(self.ctx, tmpl_ref.id)
def test_db_encrypt_decrypt(self):
"""Test encryption and decryption for single template"""
tmpl = self._create_template()
self.addCleanup(self._delete_templates, [tmpl])
self._test_db_encrypt_decrypt([tmpl.id])
self._test_db_encrypt_decrypt()
def test_db_encrypt_decrypt_in_batches(self):
"""Test encryption and decryption in for several templates.
@ -3393,9 +3404,12 @@ class DBAPICryptParamsPropsTest(common.HeatTestCase):
"""
tmpl1 = self._create_template()
tmpl2 = self._create_template()
self.addCleanup(self._delete_templates, [tmpl1, tmpl2])
template_ids = [tmpl1.id, tmpl2.id]
self._test_db_encrypt_decrypt(template_ids, batch_size=1)
stack = create_stack(self.ctx, tmpl1, self.user_creds)
create_resource(self.ctx, stack, name='res1')
stack2 = create_stack(self.ctx, tmpl2, self.user_creds)
create_resource(self.ctx, stack2, name='res2')
self._test_db_encrypt_decrypt(batch_size=1)
def test_db_encrypt_decrypt_exception_continue(self):
"""Test that encryption and decryption proceed after an exception"""
@ -3427,34 +3441,28 @@ class DBAPICryptParamsPropsTest(common.HeatTestCase):
return db_api.raw_template_create(self.ctx, template)
tmpl1 = create_malformed_template()
tmpl2 = self._create_template()
self.addCleanup(self._delete_templates, [tmpl1, tmpl2])
tmpl1 = db_api.raw_template_get(self.ctx, tmpl1.id)
self.assertEqual('', tmpl1.environment)
create_malformed_template()
self._create_template()
# Test encryption
enc_result = db_api.db_encrypt_parameters_and_properties(
self.ctx, cfg.CONF.auth_encryption_key, batch_size=50)
self.assertEqual(1, len(enc_result))
self.assertIs(AttributeError, type(enc_result[0]))
tmpl1 = db_api.raw_template_get(self.ctx, tmpl1.id)
tmpl2 = db_api.raw_template_get(self.ctx, tmpl2.id)
self.assertEqual('', tmpl1.environment)
enc_tmpls = self.ctx.session.query(models.RawTemplate).all()
self.assertEqual('', enc_tmpls[1].environment)
self.assertEqual('cryptography_decrypt_v1',
tmpl2.environment['parameters']['param2'][0])
enc_tmpls[2].environment['parameters']['param2'][0])
# Test decryption
dec_result = db_api.db_decrypt_parameters_and_properties(
self.ctx, cfg.CONF.auth_encryption_key, batch_size=50)
self.assertEqual(len(dec_result), 1)
self.assertIs(TypeError, type(dec_result[0]))
tmpl1 = db_api.raw_template_get(self.ctx, tmpl1.id)
tmpl2 = db_api.raw_template_get(self.ctx, tmpl2.id)
self.assertEqual('', tmpl1.environment)
dec_tmpls = self.ctx.session.query(models.RawTemplate).all()
self.assertEqual('', dec_tmpls[1].environment)
self.assertEqual('bar',
tmpl2.environment['parameters']['param2'])
dec_tmpls[2].environment['parameters']['param2'])
def test_db_encrypt_no_env(self):
template = {