Merge "Move credentials logic into config.py"

This commit is contained in:
Zuul 2019-06-20 20:52:49 +00:00 committed by Gerrit Code Review
commit 9f70194e4c
7 changed files with 129 additions and 152 deletions

View File

@ -25,7 +25,6 @@ from pegleg.engine import bundle
from pegleg.engine import catalog from pegleg.engine import catalog
from pegleg.engine.secrets import wrap_secret from pegleg.engine.secrets import wrap_secret
from pegleg.engine.util import files from pegleg.engine.util import files
from pegleg.engine.util.pegleg_secret_management import PeglegSecretManagement
from pegleg.engine.util.shipyard_helper import ShipyardHelper from pegleg.engine.util.shipyard_helper import ShipyardHelper
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -542,17 +541,7 @@ def wrap_secret_cli(*, site_name, author, filename, output_path, schema,
'to genesis.sh script.') 'to genesis.sh script.')
@SITE_REPOSITORY_ARGUMENT @SITE_REPOSITORY_ARGUMENT
def genesis_bundle(*, build_dir, validators, site_name): def genesis_bundle(*, build_dir, validators, site_name):
passphrase = os.environ.get("PEGLEG_PASSPHRASE")
salt = os.environ.get("PEGLEG_SALT")
encryption_key = os.environ.get("PROMENADE_ENCRYPTION_KEY") encryption_key = os.environ.get("PROMENADE_ENCRYPTION_KEY")
if passphrase:
passphrase = passphrase.encode()
if salt:
salt = salt.encode()
config.set_passphrase(passphrase)
config.set_salt(salt)
PeglegSecretManagement.check_environment()
bundle.build_genesis(build_dir, bundle.build_genesis(build_dir,
encryption_key, encryption_key,
validators, validators,

View File

@ -16,6 +16,8 @@
# context passing but will require a somewhat heavy code refactor. See: # context passing but will require a somewhat heavy code refactor. See:
# http://click.pocoo.org/5/commands/#nested-handling-and-contexts # http://click.pocoo.org/5/commands/#nested-handling-and-contexts
import os
from pegleg.engine import exceptions from pegleg.engine import exceptions
try: try:
@ -155,15 +157,16 @@ def set_rel_type_path(p):
GLOBAL_CONTEXT['type_path'] = p GLOBAL_CONTEXT['type_path'] = p
def set_passphrase(passphrase): def set_passphrase():
"""Set the passphrase for encryption and decryption.""" """Set the passphrase for encryption and decryption."""
passphrase = os.environ.get('PEGLEG_PASSPHRASE')
if not passphrase: if not passphrase:
raise exceptions.PassphraseNotFoundException() raise exceptions.PassphraseNotFoundException()
elif len(passphrase) < GLOBAL_CONTEXT['passphrase_min_length']: elif len(passphrase) < GLOBAL_CONTEXT['passphrase_min_length']:
raise exceptions.PassphraseInsufficientLengthException() raise exceptions.PassphraseInsufficientLengthException()
GLOBAL_CONTEXT['passphrase'] = passphrase GLOBAL_CONTEXT['passphrase'] = passphrase.encode()
def get_passphrase(): def get_passphrase():
@ -171,15 +174,16 @@ def get_passphrase():
return GLOBAL_CONTEXT['passphrase'] return GLOBAL_CONTEXT['passphrase']
def set_salt(salt): def set_salt():
"""Set the salt for encryption and decryption.""" """Set the salt for encryption and decryption."""
salt = os.environ.get('PEGLEG_SALT')
if not salt: if not salt:
raise exceptions.SaltNotFoundException() raise exceptions.SaltNotFoundException()
elif len(salt) < GLOBAL_CONTEXT['salt_min_length']: elif len(salt) < GLOBAL_CONTEXT['salt_min_length']:
raise exceptions.SaltInsufficientLengthException() raise exceptions.SaltInsufficientLengthException()
GLOBAL_CONTEXT['salt'] = salt GLOBAL_CONTEXT['salt'] = salt.encode()
def get_salt(): def get_salt():

View File

@ -13,8 +13,6 @@
# limitations under the License. # limitations under the License.
import logging import logging
import os
import re
import click import click
import yaml import yaml
@ -27,16 +25,17 @@ from pegleg.engine.util.pegleg_managed_document import \
PeglegManagedSecretsDocument as PeglegManagedSecret PeglegManagedSecretsDocument as PeglegManagedSecret
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
PASSPHRASE_PATTERN = '^.{24,}$' # nosec (alexanderhughes)
ENV_PASSPHRASE = 'PEGLEG_PASSPHRASE' # nosec (alexanderhughes)
ENV_SALT = 'PEGLEG_SALT'
class PeglegSecretManagement(object): class PeglegSecretManagement(object):
"""An object to handle operations on of a pegleg managed file.""" """An object to handle operations on of a pegleg managed file."""
def __init__(self, file_path=None, docs=None, generated=False, def __init__(self,
catalog=None, author=None): file_path=None,
docs=None,
generated=False,
catalog=None,
author=None):
""" """
Read the source file and the environment data needed to wrap and Read the source file and the environment data needed to wrap and
process the file documents as pegleg managed document. process the file documents as pegleg managed document.
@ -44,6 +43,12 @@ class PeglegSecretManagement(object):
provided. provided.
""" """
config.set_passphrase()
self.passphrase = config.get_passphrase()
config.set_salt()
self.salt = config.get_salt()
if all([file_path, docs]) or not any([file_path, docs]): if all([file_path, docs]) or not any([file_path, docs]):
raise ValueError('Either `file_path` or `docs` must be ' raise ValueError('Either `file_path` or `docs` must be '
'specified.') 'specified.')
@ -52,17 +57,17 @@ class PeglegSecretManagement(object):
raise ValueError("If the document is generated, author and " raise ValueError("If the document is generated, author and "
"catalog must be specified.") "catalog must be specified.")
self.check_environment()
self.file_path = file_path self.file_path = file_path
self.documents = list() self.documents = list()
self._generated = generated self._generated = generated
if docs: if docs:
for doc in docs: for doc in docs:
self.documents.append(PeglegManagedSecret(doc, self.documents.append(
generated=generated, PeglegManagedSecret(doc,
catalog=catalog, generated=generated,
author=author)) catalog=catalog,
author=author))
else: else:
self.file_path = file_path self.file_path = file_path
for doc in files.read(file_path): for doc in files.read(file_path):
@ -70,18 +75,6 @@ class PeglegSecretManagement(object):
self._author = author self._author = author
if config.get_passphrase() and config.get_salt():
self.passphrase = config.get_passphrase()
self.salt = config.get_salt()
elif config.get_passphrase() or config.get_salt():
raise ValueError("ERROR: Pegleg configuration must either have "
"both a passphrase and a salt or neither.")
else:
self.passphrase = os.environ.get(ENV_PASSPHRASE).encode()
self.salt = os.environ.get(ENV_SALT).encode()
config.set_passphrase(self.passphrase)
config.set_salt(self.salt)
def __iter__(self): def __iter__(self):
""" """
Make the secret management object iterable Make the secret management object iterable
@ -89,28 +82,6 @@ class PeglegSecretManagement(object):
""" """
return (doc.pegleg_document for doc in self.documents) return (doc.pegleg_document for doc in self.documents)
@staticmethod
def check_environment():
"""
Validate required environment variables for encryption or decryption.
:return None
:raises click.ClickException: If environment validation should fail.
"""
# Verify that passphrase environment variable is defined and is longer
# than 24 characters.
if not os.environ.get(ENV_PASSPHRASE) or not re.match(
PASSPHRASE_PATTERN, os.environ.get(ENV_PASSPHRASE)):
raise click.ClickException(
'Environment variable {} is not defined or '
'is not at least 24-character long.'.format(ENV_PASSPHRASE))
if not os.environ.get(ENV_SALT):
raise click.ClickException(
'Environment variable {} is not defined or '
'is an empty string.'.format(ENV_SALT))
def encrypt_secrets(self, save_path): def encrypt_secrets(self, save_path):
""" """
Wrap and encrypt the secrets documents included in the input file, Wrap and encrypt the secrets documents included in the input file,
@ -166,8 +137,7 @@ class PeglegSecretManagement(object):
secret_doc = doc.get_secret() secret_doc = doc.get_secret()
if type(secret_doc) != bytes: if type(secret_doc) != bytes:
secret_doc = secret_doc.encode() secret_doc = secret_doc.encode()
doc.set_secret( doc.set_secret(encrypt(secret_doc, self.passphrase, self.salt))
encrypt(secret_doc, self.passphrase, self.salt))
doc.set_encrypted(self._author) doc.set_encrypted(self._author)
encrypted_docs = True encrypted_docs = True
doc_list.append(doc.pegleg_document) doc_list.append(doc.pegleg_document)
@ -180,11 +150,10 @@ class PeglegSecretManagement(object):
secrets = self.get_decrypted_secrets() secrets = self.get_decrypted_secrets()
return yaml.safe_dump_all( return yaml.safe_dump_all(secrets,
secrets, explicit_start=True,
explicit_start=True, explicit_end=True,
explicit_end=True, default_flow_style=False)
default_flow_style=False)
def get_decrypted_secrets(self): def get_decrypted_secrets(self):
""" """

View File

@ -24,8 +24,6 @@ from pegleg.engine import bundle
from pegleg.engine.exceptions import GenesisBundleEncryptionException from pegleg.engine.exceptions import GenesisBundleEncryptionException
from pegleg.engine.exceptions import GenesisBundleGenerateException from pegleg.engine.exceptions import GenesisBundleGenerateException
from pegleg.engine.util import files from pegleg.engine.util import files
from pegleg.engine.util.pegleg_secret_management import ENV_PASSPHRASE
from pegleg.engine.util.pegleg_secret_management import ENV_SALT
from tests.unit.fixtures import temp_path from tests.unit.fixtures import temp_path
@ -90,8 +88,8 @@ data: ABAgagajajkb839215387
@mock.patch.dict(os.environ, { @mock.patch.dict(os.environ, {
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC', 'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
ENV_SALT: 'MySecretSalt1234567890][' 'PEGLEG_SALT': 'MySecretSalt1234567890]['
}) })
def test_no_encryption_key(temp_path): def test_no_encryption_key(temp_path):
# Write the test data to temp file # Write the test data to temp file
@ -118,8 +116,8 @@ def test_no_encryption_key(temp_path):
@mock.patch.dict(os.environ, { @mock.patch.dict(os.environ, {
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC', 'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
ENV_SALT: 'MySecretSalt1234567890][' 'PEGLEG_SALT': 'MySecretSalt1234567890]['
}) })
def test_failed_deckhand_validation(temp_path): def test_failed_deckhand_validation(temp_path):
# Write the test data to temp file # Write the test data to temp file

View File

@ -28,8 +28,6 @@ from pegleg.engine.util.cryptostring import CryptoString
from pegleg.engine.util import encryption from pegleg.engine.util import encryption
from pegleg.engine import util from pegleg.engine import util
import pegleg import pegleg
from pegleg.engine.util.pegleg_secret_management import ENV_PASSPHRASE
from pegleg.engine.util.pegleg_secret_management import ENV_SALT
TEST_PASSPHRASES_CATALOG = yaml.safe_load(""" TEST_PASSPHRASES_CATALOG = yaml.safe_load("""
--- ---
@ -166,8 +164,8 @@ TEST_BASE64_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_BASE64_PASSPHRASES_CATA
return_value=[ return_value=[
'cicd_site_repo/site/cicd/passphrases/passphrase-catalog.yaml', ]) 'cicd_site_repo/site/cicd/passphrases/passphrase-catalog.yaml', ])
@mock.patch.dict(os.environ, { @mock.patch.dict(os.environ, {
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC', 'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
ENV_SALT: 'MySecretSalt1234567890]['}) 'PEGLEG_SALT': 'MySecretSalt1234567890]['})
def test_generate_passphrases(*_): def test_generate_passphrases(*_):
_dir = tempfile.mkdtemp() _dir = tempfile.mkdtemp()
os.makedirs(os.path.join(_dir, 'cicd_site_repo'), exist_ok=True) os.makedirs(os.path.join(_dir, 'cicd_site_repo'), exist_ok=True)
@ -239,8 +237,8 @@ def test_generate_passphrases_exception(capture):
return_value=[ return_value=[
'cicd_global_repo/site/cicd/passphrases/passphrase-catalog.yaml', ]) 'cicd_global_repo/site/cicd/passphrases/passphrase-catalog.yaml', ])
@mock.patch.dict(os.environ, { @mock.patch.dict(os.environ, {
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC', 'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
ENV_SALT: 'MySecretSalt1234567890]['}) 'PEGLEG_SALT': 'MySecretSalt1234567890]['})
def test_global_passphrase_catalog(*_): def test_global_passphrase_catalog(*_):
_dir = tempfile.mkdtemp() _dir = tempfile.mkdtemp()
os.makedirs(os.path.join(_dir, 'cicd_site_repo'), exist_ok=True) os.makedirs(os.path.join(_dir, 'cicd_site_repo'), exist_ok=True)
@ -288,8 +286,8 @@ def test_global_passphrase_catalog(*_):
return_value=[ return_value=[
'cicd_global_repo/site/cicd/passphrases/passphrase-catalog.yaml', ]) 'cicd_global_repo/site/cicd/passphrases/passphrase-catalog.yaml', ])
@mock.patch.dict(os.environ, { @mock.patch.dict(os.environ, {
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC', 'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
ENV_SALT: 'MySecretSalt1234567890]['}) 'PEGLEG_SALT': 'MySecretSalt1234567890]['})
def test_base64_passphrase_catalog(*_): def test_base64_passphrase_catalog(*_):
_dir = tempfile.mkdtemp() _dir = tempfile.mkdtemp()
os.makedirs(os.path.join(_dir, 'cicd_site_repo'), exist_ok=True) os.makedirs(os.path.join(_dir, 'cicd_site_repo'), exist_ok=True)
@ -313,8 +311,8 @@ def test_base64_passphrase_catalog(*_):
@mock.patch.dict(os.environ, { @mock.patch.dict(os.environ, {
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC', 'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
ENV_SALT: 'MySecretSalt1234567890]['}) 'PEGLEG_SALT': 'MySecretSalt1234567890]['})
def test_crypt_coding_flow(): def test_crypt_coding_flow():
cs_util = CryptoString() cs_util = CryptoString()
orig_passphrase = cs_util.get_crypto_string() orig_passphrase = cs_util.get_crypto_string()

View File

@ -24,13 +24,12 @@ import yaml
from pegleg import config from pegleg import config
from pegleg.engine.catalog.pki_generator import PKIGenerator from pegleg.engine.catalog.pki_generator import PKIGenerator
from pegleg.engine.catalog import pki_utility from pegleg.engine.catalog import pki_utility
from pegleg.engine import exceptions
from pegleg.engine import secrets from pegleg.engine import secrets
from pegleg.engine.util import encryption as crypt, catalog, git from pegleg.engine.util import encryption as crypt, catalog, git
from pegleg.engine.util import files from pegleg.engine.util import files
from pegleg.engine.util.pegleg_managed_document import \ from pegleg.engine.util.pegleg_managed_document import \
PeglegManagedSecretsDocument PeglegManagedSecretsDocument
from pegleg.engine.util.pegleg_secret_management import ENV_PASSPHRASE
from pegleg.engine.util.pegleg_secret_management import ENV_SALT
from pegleg.engine.util.pegleg_secret_management import PeglegSecretManagement from pegleg.engine.util.pegleg_secret_management import PeglegSecretManagement
from tests.unit import test_utils from tests.unit import test_utils
from tests.unit.fixtures import temp_path, create_tmp_deployment_files, \ from tests.unit.fixtures import temp_path, create_tmp_deployment_files, \
@ -72,19 +71,31 @@ def test_encrypt_and_decrypt():
assert data != enc3 assert data != enc3
@mock.patch.dict(os.environ, { @mock.patch.dict(
ENV_PASSPHRASE: 'aShortPassphrase', os.environ, {
ENV_SALT: 'MySecretSalt1234567890][' 'PEGLEG_PASSPHRASE': 'aShortPassphrase',
}) 'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_short_passphrase(): def test_short_passphrase():
with pytest.raises(click.ClickException, with pytest.raises(exceptions.PassphraseInsufficientLengthException):
match=r'.*is not at least 24-character long.*'):
PeglegSecretManagement(file_path='file_path', author='test_author') PeglegSecretManagement(file_path='file_path', author='test_author')
@mock.patch.dict(os.environ, { @mock.patch.dict(
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC', os.environ, {
ENV_SALT: 'MySecretSalt1234567890]['}) 'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
'PEGLEG_SALT': 'aShortSalt'
})
def test_short_salt():
with pytest.raises(exceptions.SaltInsufficientLengthException):
PeglegSecretManagement(file_path='file_path', author='test_author')
@mock.patch.dict(
os.environ, {
'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_secret_encrypt_and_decrypt(create_tmp_deployment_files, tmpdir): def test_secret_encrypt_and_decrypt(create_tmp_deployment_files, tmpdir):
site_dir = tmpdir.join("deployment_files", "site", "cicd") site_dir = tmpdir.join("deployment_files", "site", "cicd")
passphrase_doc = """--- passphrase_doc = """---
@ -98,8 +109,7 @@ metadata:
layer: {2} layer: {2}
data: {0}-password data: {0}-password
... ...
""".format("cicd-passphrase-encrypted", "encrypted", """.format("cicd-passphrase-encrypted", "encrypted", "site")
"site")
with open(os.path.join(str(site_dir), 'secrets', with open(os.path.join(str(site_dir), 'secrets',
'passphrases', 'passphrases',
'cicd-passphrase-encrypted.yaml'), "w") \ 'cicd-passphrase-encrypted.yaml'), "w") \
@ -113,12 +123,19 @@ data: {0}-password
encrypted_files = listdir(save_location_str) encrypted_files = listdir(save_location_str)
assert len(encrypted_files) > 0 assert len(encrypted_files) > 0
encrypted_path = str(save_location.join("site/cicd/secrets/passphrases/" encrypted_path = str(
"cicd-passphrase-encrypted.yaml")) save_location.join("site/cicd/secrets/passphrases/"
"cicd-passphrase-encrypted.yaml"))
decrypted = secrets.decrypt(encrypted_path) decrypted = secrets.decrypt(encrypted_path)
assert yaml.safe_load(decrypted[encrypted_path]) == yaml.safe_load(passphrase_doc) assert yaml.safe_load(
decrypted[encrypted_path]) == yaml.safe_load(passphrase_doc)
@mock.patch.dict(
os.environ, {
'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_pegleg_secret_management_constructor(): def test_pegleg_secret_management_constructor():
test_data = yaml.safe_load(TEST_DATA) test_data = yaml.safe_load(TEST_DATA)
doc = PeglegManagedSecretsDocument(test_data) doc = PeglegManagedSecretsDocument(test_data)
@ -126,6 +143,11 @@ def test_pegleg_secret_management_constructor():
assert not doc.is_encrypted() assert not doc.is_encrypted()
@mock.patch.dict(
os.environ, {
'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_pegleg_secret_management_constructor_with_invalid_arguments(): def test_pegleg_secret_management_constructor_with_invalid_arguments():
with pytest.raises(ValueError) as err_info: with pytest.raises(ValueError) as err_info:
PeglegSecretManagement(file_path=None, docs=None) PeglegSecretManagement(file_path=None, docs=None)
@ -136,31 +158,32 @@ def test_pegleg_secret_management_constructor_with_invalid_arguments():
assert 'Either `file_path` or `docs` must be specified.' in str( assert 'Either `file_path` or `docs` must be specified.' in str(
err_info.value) err_info.value)
with pytest.raises(ValueError) as err_info: with pytest.raises(ValueError) as err_info:
PeglegSecretManagement( PeglegSecretManagement(file_path='file_path',
file_path='file_path', generated=True, author='test_author') generated=True,
author='test_author')
assert 'If the document is generated, author and catalog must be ' \ assert 'If the document is generated, author and catalog must be ' \
'specified.' in str(err_info.value) 'specified.' in str(err_info.value)
with pytest.raises(ValueError) as err_info: with pytest.raises(ValueError) as err_info:
PeglegSecretManagement( PeglegSecretManagement(docs=['doc'], generated=True)
docs=['doc'], generated=True)
assert 'If the document is generated, author and catalog must be ' \ assert 'If the document is generated, author and catalog must be ' \
'specified.' in str(err_info.value) 'specified.' in str(err_info.value)
with pytest.raises(ValueError) as err_info: with pytest.raises(ValueError) as err_info:
PeglegSecretManagement( PeglegSecretManagement(docs=['doc'],
docs=['doc'], generated=True, author='test_author') generated=True,
author='test_author')
assert 'If the document is generated, author and catalog must be ' \ assert 'If the document is generated, author and catalog must be ' \
'specified.' in str(err_info.value) 'specified.' in str(err_info.value)
with pytest.raises(ValueError) as err_info: with pytest.raises(ValueError) as err_info:
PeglegSecretManagement( PeglegSecretManagement(docs=['doc'], generated=True, catalog='catalog')
docs=['doc'], generated=True, catalog='catalog')
assert 'If the document is generated, author and catalog must be ' \ assert 'If the document is generated, author and catalog must be ' \
'specified.' in str(err_info.value) 'specified.' in str(err_info.value)
@mock.patch.dict(os.environ, { @mock.patch.dict(
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC', os.environ, {
ENV_SALT: 'MySecretSalt1234567890][' 'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
}) 'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_pegleg_secret_management_double_encrypt(): def test_pegleg_secret_management_double_encrypt():
encrypted_doc = PeglegSecretManagement( encrypted_doc = PeglegSecretManagement(
docs=[yaml.safe_load(TEST_DATA)]).get_encrypted_secrets()[0][0] docs=[yaml.safe_load(TEST_DATA)]).get_encrypted_secrets()[0][0]
@ -169,10 +192,11 @@ def test_pegleg_secret_management_double_encrypt():
assert encrypted_doc == encrypted_doc_2 assert encrypted_doc == encrypted_doc_2
@mock.patch.dict(os.environ, { @mock.patch.dict(
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC', os.environ, {
ENV_SALT: 'MySecretSalt1234567890][' 'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
}) 'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_encrypt_decrypt_using_file_path(temp_path): def test_encrypt_decrypt_using_file_path(temp_path):
# write the test data to temp file # write the test data to temp file
test_data = list(yaml.safe_load_all(TEST_DATA)) test_data = list(yaml.safe_load_all(TEST_DATA))
@ -188,29 +212,27 @@ def test_encrypt_decrypt_using_file_path(temp_path):
assert doc.data['encrypted']['by'] == 'test_author' assert doc.data['encrypted']['by'] == 'test_author'
# decrypt documents and validate that they were decrypted # decrypt documents and validate that they were decrypted
doc_mgr = PeglegSecretManagement( doc_mgr = PeglegSecretManagement(file_path=file_path, author='test_author')
file_path=file_path, author='test_author')
doc_mgr.encrypt_secrets(save_path) doc_mgr.encrypt_secrets(save_path)
# read back the encrypted file # read back the encrypted file
doc_mgr = PeglegSecretManagement( doc_mgr = PeglegSecretManagement(file_path=save_path, author='test_author')
file_path=save_path, author='test_author')
decrypted_data = doc_mgr.get_decrypted_secrets() decrypted_data = doc_mgr.get_decrypted_secrets()
assert test_data[0]['data'] == decrypted_data[0]['data'] assert test_data[0]['data'] == decrypted_data[0]['data']
assert test_data[0]['schema'] == decrypted_data[0]['schema'] assert test_data[0]['schema'] == decrypted_data[0]['schema']
@mock.patch.dict(os.environ, { @mock.patch.dict(
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC', os.environ, {
ENV_SALT: 'MySecretSalt1234567890][' 'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
}) 'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_encrypt_decrypt_using_docs(temp_path): def test_encrypt_decrypt_using_docs(temp_path):
# write the test data to temp file # write the test data to temp file
test_data = list(yaml.safe_load_all(TEST_DATA)) test_data = list(yaml.safe_load_all(TEST_DATA))
save_path = os.path.join(temp_path, 'encrypted_secrets_file.yaml') save_path = os.path.join(temp_path, 'encrypted_secrets_file.yaml')
# encrypt documents and validate that they were encrypted # encrypt documents and validate that they were encrypted
doc_mgr = PeglegSecretManagement( doc_mgr = PeglegSecretManagement(docs=test_data, author='test_author')
docs=test_data, author='test_author')
doc_mgr.encrypt_secrets(save_path) doc_mgr.encrypt_secrets(save_path)
doc = doc_mgr.documents[0] doc = doc_mgr.documents[0]
assert doc.is_encrypted() assert doc.is_encrypted()
@ -221,8 +243,7 @@ def test_encrypt_decrypt_using_docs(temp_path):
encrypted_data = list(yaml.safe_load_all(stream)) encrypted_data = list(yaml.safe_load_all(stream))
# decrypt documents and validate that they were decrypted # decrypt documents and validate that they were decrypted
doc_mgr = PeglegSecretManagement( doc_mgr = PeglegSecretManagement(docs=encrypted_data, author='test_author')
docs=encrypted_data, author='test_author')
decrypted_data = doc_mgr.get_decrypted_secrets() decrypted_data = doc_mgr.get_decrypted_secrets()
assert test_data[0]['data'] == decrypted_data[0]['data'] assert test_data[0]['data'] == decrypted_data[0]['data']
assert test_data[0]['schema'] == decrypted_data[0]['schema'] assert test_data[0]['schema'] == decrypted_data[0]['schema']
@ -232,21 +253,21 @@ def test_encrypt_decrypt_using_docs(temp_path):
'metadata']['storagePolicy'] 'metadata']['storagePolicy']
@pytest.mark.skipif( @pytest.mark.skipif(not pki_utility.PKIUtility.cfssl_exists(),
not pki_utility.PKIUtility.cfssl_exists(), reason='cfssl must be installed to execute these tests')
reason='cfssl must be installed to execute these tests') @mock.patch.dict(
@mock.patch.dict(os.environ, { os.environ, {
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC', 'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
ENV_SALT: 'MySecretSalt1234567890][' 'PEGLEG_SALT': 'MySecretSalt1234567890]['
}) })
def test_generate_pki_using_local_repo_path(create_tmp_deployment_files): def test_generate_pki_using_local_repo_path(create_tmp_deployment_files):
"""Validates ``generate-pki`` action using local repo path.""" """Validates ``generate-pki`` action using local repo path."""
# Scenario: # Scenario:
# #
# 1) Generate PKI using local repo path # 1) Generate PKI using local repo path
repo_path = str(git.git_handler(TEST_PARAMS["repo_url"], repo_path = str(
ref=TEST_PARAMS["repo_rev"])) git.git_handler(TEST_PARAMS["repo_url"], ref=TEST_PARAMS["repo_rev"]))
with mock.patch.dict(config.GLOBAL_CONTEXT, {"site_repo": repo_path}): with mock.patch.dict(config.GLOBAL_CONTEXT, {"site_repo": repo_path}):
pki_generator = PKIGenerator(duration=365, pki_generator = PKIGenerator(duration=365,
sitename=TEST_PARAMS["site_name"]) sitename=TEST_PARAMS["site_name"])
@ -259,17 +280,17 @@ def test_generate_pki_using_local_repo_path(create_tmp_deployment_files):
assert list(result), "%s file is empty" % generated_file.name assert list(result), "%s file is empty" % generated_file.name
@pytest.mark.skipif( @pytest.mark.skipif(not pki_utility.PKIUtility.cfssl_exists(),
not pki_utility.PKIUtility.cfssl_exists(), reason='cfssl must be installed to execute these tests')
reason='cfssl must be installed to execute these tests') @mock.patch.dict(
@mock.patch.dict(os.environ, { os.environ, {
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC', 'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
ENV_SALT: 'MySecretSalt1234567890][' 'PEGLEG_SALT': 'MySecretSalt1234567890]['
}) })
def test_check_expiry(create_tmp_deployment_files): def test_check_expiry(create_tmp_deployment_files):
""" Validates check_expiry """ """ Validates check_expiry """
repo_path = str(git.git_handler(TEST_PARAMS["repo_url"], repo_path = str(
ref=TEST_PARAMS["repo_rev"])) git.git_handler(TEST_PARAMS["repo_url"], ref=TEST_PARAMS["repo_rev"]))
with mock.patch.dict(config.GLOBAL_CONTEXT, {"site_repo": repo_path}): with mock.patch.dict(config.GLOBAL_CONTEXT, {"site_repo": repo_path}):
pki_generator = PKIGenerator(duration=365, pki_generator = PKIGenerator(duration=365,
sitename=TEST_PARAMS["site_name"]) sitename=TEST_PARAMS["site_name"])

View File

@ -19,8 +19,6 @@ import pytest
import yaml import yaml
from pegleg.engine import util from pegleg.engine import util
from pegleg.engine.util.pegleg_secret_management import ENV_PASSPHRASE
from pegleg.engine.util.pegleg_secret_management import ENV_SALT
from pegleg.engine.util.shipyard_helper import ShipyardHelper from pegleg.engine.util.shipyard_helper import ShipyardHelper
from pegleg.engine.util.shipyard_helper import ShipyardClient from pegleg.engine.util.shipyard_helper import ShipyardClient
@ -138,8 +136,8 @@ def test_shipyard_helper_init_():
@mock.patch.object(ShipyardHelper, 'formatted_response_handler', @mock.patch.object(ShipyardHelper, 'formatted_response_handler',
autospec=True, return_value=None) autospec=True, return_value=None)
@mock.patch.dict(os.environ, { @mock.patch.dict(os.environ, {
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC', 'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
ENV_SALT: 'MySecretSalt1234567890][' 'PEGLEG_SALT': 'MySecretSalt1234567890]['
}) })
def test_upload_documents(*args): def test_upload_documents(*args):
""" Tests upload document """ """ Tests upload document """
@ -171,8 +169,8 @@ def test_upload_documents(*args):
@mock.patch.object(ShipyardHelper, 'formatted_response_handler', @mock.patch.object(ShipyardHelper, 'formatted_response_handler',
autospec=True, return_value=None) autospec=True, return_value=None)
@mock.patch.dict(os.environ, { @mock.patch.dict(os.environ, {
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC', 'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
ENV_SALT: 'MySecretSalt1234567890][' 'PEGLEG_SALT': 'MySecretSalt1234567890]['
}) })
def test_upload_documents_fail(*args): def test_upload_documents_fail(*args):
""" Tests Document upload error """ """ Tests Document upload error """