pegleg/tests/unit/engine/test_secrets.py
Ian H. Pittwood fff70ad861 Refactors pegleg CLI to use single commands
Debugging pegleg can currently be difficult and the Click CLI does not
easily allow debuggers like pdb or PyCharm to use breakpoints. By moving
all CLI command calls into singular functions, we can easily create an
"if __name__ == '__main__'" entry point to call these functions and
investigate any bugs that may arise.

We also gain the ability to reuse more portions of our code by
refactoring these methods.

Change-Id: Ia9739931273eb6458f82dbb7e702a505ae397ae3
2019-12-17 19:32:28 +00:00

487 lines
17 KiB
Python

# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from os import listdir
from unittest import mock
import pytest
import yaml
from pegleg import config
from pegleg.engine.catalog.pki_generator import PKIGenerator
from pegleg.engine.catalog import pki_utility
from pegleg.engine import exceptions
from pegleg.engine import secrets
from pegleg.engine.util import encryption as crypt, git
from pegleg.engine.util import files
from pegleg.engine.util.pegleg_managed_document import \
PeglegManagedSecretsDocument
from pegleg.engine.util.pegleg_secret_management import PeglegSecretManagement
from tests.unit import test_utils
from tests.unit.cli.test_commands import TEST_PARAMS
TEST_DATA = """
---
schema: deckhand/Passphrase/v1
metadata:
schema: metadata/Document/v1
name: osh_addons_keystone_ranger-agent_password
layeringDefinition:
abstract: false
layer: site
storagePolicy: encrypted
data: 512363f37eab654313991174aef9f867d
...
"""
TEST_GLOBAL_DATA = """
---
schema: deckhand/Passphrase/v1
metadata:
schema: metadata/Document/v1
name: osh_addons_keystone_ranger-agent_password
layeringDefinition:
abstract: false
layer: global
storagePolicy: encrypted
data: 512363f37eab654313991174aef9f867d
...
"""
GLOBAL_PASSPHRASE_SALT_DOC = """
---
schema: deckhand/Passphrase/v1
metadata:
schema: metadata/Document/v1
name: global_passphrase
layeringDefinition:
abstract: false
layer: site
storagePolicy: encrypted
data: TbKYNtM@3gXpL=AFLAwU?&Ey
...
---
schema: deckhand/Passphrase/v1
metadata:
schema: metadata/Document/v1
name: global_salt
layeringDefinition:
abstract: false
layer: site
storagePolicy: encrypted
data: h3=DQ#GNYEuCvybgpfW7ZxAP
...
"""
GLOBAL_SALT_DOC = """
---
schema: deckhand/Passphrase/v1
metadata:
schema: metadata/Document/v1
name: global_salt
layeringDefinition:
abstract: false
layer: site
storagePolicy: encrypted
data: h3=DQ#GNYEuCvybgpfW7ZxAP
...
"""
def test_encrypt_and_decrypt():
data = test_utils.rand_name(
"this is an example of un-encrypted "
"data.", "pegleg").encode()
passphrase = test_utils.rand_name("passphrase1", "pegleg").encode()
salt = test_utils.rand_name("salt1", "pegleg").encode()
enc1 = crypt.encrypt(data, passphrase, salt)
dec1 = crypt.decrypt(enc1, passphrase, salt)
assert data == dec1
enc2 = crypt.encrypt(dec1, passphrase, salt)
dec2 = crypt.decrypt(enc2, passphrase, salt)
assert data == dec2
passphrase2 = test_utils.rand_name("passphrase2", "pegleg").encode()
salt2 = test_utils.rand_name("salt2", "pegleg").encode()
enc3 = crypt.encrypt(dec2, passphrase2, salt2)
dec3 = crypt.decrypt(enc3, passphrase2, salt2)
assert data == dec3
assert data != enc3
@mock.patch.dict(
os.environ, {
'PEGLEG_PASSPHRASE': 'aShortPassphrase',
'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_short_passphrase():
with pytest.raises(exceptions.PassphraseInsufficientLengthException):
PeglegSecretManagement(file_path='file_path', author='test_author')
@mock.patch.dict(
os.environ, {
'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
'PEGLEG_SALT': 'aShortSalt'
})
def test_short_salt():
with pytest.raises(exceptions.SaltInsufficientLengthException):
PeglegSecretManagement(file_path='file_path', author='test_author')
@mock.patch.dict(
os.environ, {
'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_secret_encrypt_and_decrypt(temp_deployment_files, tmpdir):
site_dir = tmpdir.join("deployment_files", "site", "cicd")
passphrase_doc = """---
schema: deckhand/Passphrase/v1
metadata:
schema: metadata/Document/v1
name: {0}
storagePolicy: {1}
layeringDefinition:
abstract: False
layer: {2}
data: {0}-password
...
""".format("cicd-passphrase-encrypted", "encrypted", "site")
with open(os.path.join(str(site_dir), 'secrets',
'passphrases',
'cicd-passphrase-encrypted.yaml'), "w") \
as outfile:
outfile.write(passphrase_doc)
save_location = tmpdir.mkdir("encrypted_files")
save_location_str = str(save_location)
secrets.encrypt(save_location_str, "pytest", "cicd")
encrypted_files = listdir(save_location_str)
assert len(encrypted_files) > 0
encrypted_path = str(
save_location.join(
"site/cicd/secrets/passphrases/"
"cicd-passphrase-encrypted.yaml"))
decrypted = secrets.decrypt(encrypted_path)
assert yaml.safe_load(
decrypted[encrypted_path]) == yaml.safe_load(passphrase_doc)
@mock.patch.dict(
os.environ, {
'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_pegleg_secret_management_constructor():
test_data = yaml.safe_load(TEST_DATA)
doc = PeglegManagedSecretsDocument(test_data)
assert doc.is_storage_policy_encrypted()
assert not doc.is_encrypted()
@mock.patch.dict(
os.environ, {
'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_pegleg_secret_management_constructor_with_invalid_arguments():
with pytest.raises(ValueError) as err_info:
PeglegSecretManagement(file_path=None, docs=None)
assert 'Either `file_path` or `docs` must be specified.' in str(
err_info.value)
with pytest.raises(ValueError) as err_info:
PeglegSecretManagement(file_path='file_path', docs=['doc1'])
assert 'Either `file_path` or `docs` must be specified.' in str(
err_info.value)
with pytest.raises(ValueError) as err_info:
PeglegSecretManagement(
file_path='file_path', generated=True, author='test_author')
assert 'If the document is generated, author and catalog must be ' \
'specified.' in str(err_info.value)
with pytest.raises(ValueError) as err_info:
PeglegSecretManagement(docs=['doc'], generated=True)
assert 'If the document is generated, author and catalog must be ' \
'specified.' in str(err_info.value)
with pytest.raises(ValueError) as err_info:
PeglegSecretManagement(
docs=['doc'], generated=True, author='test_author')
assert 'If the document is generated, author and catalog must be ' \
'specified.' in str(err_info.value)
with pytest.raises(ValueError) as err_info:
PeglegSecretManagement(docs=['doc'], generated=True, catalog='catalog')
assert 'If the document is generated, author and catalog must be ' \
'specified.' in str(err_info.value)
@mock.patch.dict(
os.environ, {
'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_pegleg_secret_management_double_encrypt():
encrypted_doc = PeglegSecretManagement(
docs=[yaml.safe_load(TEST_DATA)]).get_encrypted_secrets()[0][0]
encrypted_doc_2 = PeglegSecretManagement(
docs=[encrypted_doc]).get_encrypted_secrets()[0][0]
assert encrypted_doc == encrypted_doc_2
@mock.patch.dict(
os.environ, {
'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_encrypt_decrypt_using_file_path(tmpdir):
# write the test data to temp file
test_data = list(yaml.safe_load_all(TEST_DATA))
file_path = os.path.join(tmpdir, 'secrets_file.yaml')
files.write(test_data, file_path)
save_path = os.path.join(tmpdir, 'encrypted_secrets_file.yaml')
# encrypt documents and validate that they were encrypted
doc_mgr = PeglegSecretManagement(file_path=file_path, author='test_author')
doc_mgr.encrypt_secrets(save_path)
doc = doc_mgr.documents[0]
assert doc.is_encrypted()
assert doc.data['encrypted']['by'] == 'test_author'
# decrypt documents and validate that they were decrypted
doc_mgr = PeglegSecretManagement(file_path=file_path, author='test_author')
doc_mgr.encrypt_secrets(save_path)
# read back the encrypted file
doc_mgr = PeglegSecretManagement(file_path=save_path, author='test_author')
decrypted_data = doc_mgr.get_decrypted_secrets()
assert test_data[0]['data'] == decrypted_data[0]['data']
assert test_data[0]['schema'] == decrypted_data[0]['schema']
@mock.patch.dict(
os.environ, {
'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_encrypt_decrypt_using_docs(tmpdir):
# write the test data to temp file
test_data = list(yaml.safe_load_all(TEST_DATA))
save_path = os.path.join(tmpdir, 'encrypted_secrets_file.yaml')
# encrypt documents and validate that they were encrypted
doc_mgr = PeglegSecretManagement(docs=test_data, author='test_author')
doc_mgr.encrypt_secrets(save_path)
doc = doc_mgr.documents[0]
assert doc.is_encrypted()
assert doc.data['encrypted']['by'] == 'test_author'
# read back the encrypted file
with open(save_path) as stream:
encrypted_data = list(yaml.safe_load_all(stream))
# decrypt documents and validate that they were decrypted
doc_mgr = PeglegSecretManagement(docs=encrypted_data, author='test_author')
decrypted_data = doc_mgr.get_decrypted_secrets()
assert test_data[0]['data'] == decrypted_data[0]['data']
assert test_data[0]['schema'] == decrypted_data[0]['schema']
assert test_data[0]['metadata']['name'] == decrypted_data[0]['metadata'][
'name']
assert test_data[0]['metadata']['storagePolicy'] == decrypted_data[0][
'metadata']['storagePolicy']
@pytest.mark.skipif(
not pki_utility.PKIUtility.cfssl_exists(),
reason='cfssl must be installed to execute these tests')
@mock.patch.dict(
os.environ, {
'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_generate_pki_using_local_repo_path(temp_deployment_files):
"""Validates ``generate-pki`` action using local repo path."""
# Scenario:
#
# 1) Generate PKI using local repo path
repo_path = str(
git.git_handler(TEST_PARAMS["repo_url"], ref=TEST_PARAMS["repo_rev"]))
with mock.patch.dict(config.GLOBAL_CONTEXT, {"site_repo": repo_path}):
pki_generator = PKIGenerator(
duration=365, sitename=TEST_PARAMS["site_name"])
generated_files = pki_generator.generate()
assert len(generated_files), 'No secrets were generated'
for generated_file in generated_files:
with open(generated_file, 'r') as f:
result = yaml.safe_load_all(f) # Validate valid YAML.
assert list(result), "%s file is empty" % generated_file.name
@pytest.mark.skipif(
not pki_utility.PKIUtility.cfssl_exists(),
reason='cfssl must be installed to execute these tests')
@mock.patch.dict(
os.environ, {
'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_check_expiry(temp_deployment_files):
""" Validates check_expiry """
repo_path = str(
git.git_handler(TEST_PARAMS["repo_url"], ref=TEST_PARAMS["repo_rev"]))
with mock.patch.dict(config.GLOBAL_CONTEXT, {"site_repo": repo_path}):
pki_generator = PKIGenerator(
duration=365, sitename=TEST_PARAMS["site_name"])
generated_files = pki_generator.generate()
pki_util = pki_utility.PKIUtility(duration=0)
assert len(generated_files), 'No secrets were generated'
for generated_file in generated_files:
if "certificate" not in generated_file:
continue
with open(generated_file, 'r') as f:
results = yaml.safe_load_all(f) # Validate valid YAML.
results = PeglegSecretManagement(
docs=results).get_decrypted_secrets()
for result in results:
if result['schema'] == \
"deckhand/Certificate/v1":
cert = result['data']
cert_info = pki_util.check_expiry(cert)
assert cert_info['expired'] is False, \
"%s is expired/expiring on %s" % \
(result['metadata']['name'],
cert_info['expiry_date'])
@mock.patch.dict(
os.environ, {
'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_get_global_creds_missing_creds(temp_deployment_files, tmpdir):
# Create site files
site_dir = tmpdir.join("deployment_files", "site", "cicd")
save_location = tmpdir.mkdir("encrypted_site_files")
save_location_str = str(save_location)
# Capture global credentials, verify they are not present and we default
# to site credentials instead.
config.set_global_enc_keys("cicd")
passphrase, salt = secrets.get_global_creds("cicd")
assert passphrase.decode() == 'ytrr89erARAiPE34692iwUMvWqqBvC'
assert salt.decode() == 'MySecretSalt1234567890]['
@mock.patch.dict(
os.environ, {
'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_get_global_creds_missing_pass(temp_deployment_files, tmpdir):
# Create site files
site_dir = tmpdir.join("deployment_files", "site", "cicd")
# Create global salt file
with open(os.path.join(str(site_dir), 'secrets', 'passphrases',
'cicd-global-salt-encrypted.yaml'), "w") as outfile:
outfile.write(GLOBAL_SALT_DOC)
save_location = tmpdir.mkdir("encrypted_site_files")
save_location_str = str(save_location)
# Demonstrate that encryption fails when only the global salt or
# only the global passphrase are present among the site files.
with pytest.raises(exceptions.GlobalCredentialsNotFound):
config.set_global_enc_keys("cicd")
@mock.patch.dict(
os.environ, {
'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_get_global_creds(temp_deployment_files, tmpdir):
# Create site files
site_dir = tmpdir.join("deployment_files", "site", "cicd")
# Create global passphrase and salt file
with open(os.path.join(str(site_dir), 'secrets',
'passphrases',
'cicd-global-passphrase-encrypted.yaml'), "w") \
as outfile:
outfile.write(GLOBAL_PASSPHRASE_SALT_DOC)
save_location = tmpdir.mkdir("encrypted_site_files")
save_location_str = str(save_location)
# Encrypt the global passphrase and salt file using site passphrase/salt
config.set_global_enc_keys("cicd")
secrets.encrypt(save_location_str, "pytest", "cicd")
encrypted_files = listdir(save_location_str)
assert len(encrypted_files) > 0
# Capture global credentials, verify we have the right ones
passphrase, salt = secrets.get_global_creds("cicd")
assert passphrase.decode() == "TbKYNtM@3gXpL=AFLAwU?&Ey"
assert salt.decode() == "h3=DQ#GNYEuCvybgpfW7ZxAP"
@mock.patch.dict(
os.environ, {
'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_global_encrypt_decrypt(temp_deployment_files, tmpdir):
# Create site files
site_dir = tmpdir.join("deployment_files", "site", "cicd")
# Create and encrypt global passphrase and salt file using site keys
with open(os.path.join(str(site_dir), 'secrets',
'passphrases',
'cicd-global-passphrase-encrypted.yaml'), "w") \
as outfile:
outfile.write(GLOBAL_PASSPHRASE_SALT_DOC)
save_location = tmpdir.mkdir("encrypted_site_files")
save_location_str = str(save_location)
# Encrypt the global passphrase and salt file using site passphrase/salt
config.set_global_enc_keys("cicd")
secrets.encrypt(save_location_str, "pytest", "cicd")
# Create and encrypt a global type document
global_doc_path = os.path.join(
str(site_dir), 'secrets', 'passphrases', 'globally_encrypted_doc.yaml')
with open(global_doc_path, "w") as outfile:
outfile.write(TEST_GLOBAL_DATA)
# encrypt documents and validate that they were encrypted
doc_mgr = PeglegSecretManagement(
file_path=global_doc_path, author='pytest', site_name='cicd')
doc_mgr.encrypt_secrets(global_doc_path)
doc = doc_mgr.documents[0]
assert doc.is_encrypted()
assert doc.data['encrypted']['by'] == 'pytest'
doc_mgr = PeglegSecretManagement(
file_path=global_doc_path, author='pytest', site_name='cicd')
decrypted_data = doc_mgr.get_decrypted_secrets()
test_data = list(yaml.safe_load_all(TEST_GLOBAL_DATA))
assert test_data[0]['data'] == decrypted_data[0]['data']
assert test_data[0]['schema'] == decrypted_data[0]['schema']