bugfix for global encrypt/decrypt

This patch updates layer for wrapped documents to preserve original
layer.  Previously all encrypted documents had site layer.

Update encrypt/decrypt logic when determining global keys.

Update units tests.

Change-Id: I447aeaea08a4514655fcabfc7077b6d4b282e27f
This commit is contained in:
Alexander Hughes 2019-09-19 12:27:20 -05:00 committed by Alexander Hughes
parent 438db81d27
commit f56d20a2da
6 changed files with 60 additions and 34 deletions

View File

@ -421,7 +421,7 @@ def upload(
ctx.obj['context_marker'] = str(context_marker)
ctx.obj['site_name'] = site_name
ctx.obj['collection'] = collection
config.set_global_enc_keys(site_name)
click.echo(ShipyardHelper(ctx, buffer_mode).upload_documents())
@ -469,6 +469,7 @@ def generate_pki(site_name, author, days, regenerate_all):
"""
engine.repository.process_repositories(site_name, overwrite_existing=True)
config.set_global_enc_keys(site_name)
pkigenerator = catalog.pki_generator.PKIGenerator(
site_name, author=author, duration=days, regenerate_all=regenerate_all)
output_paths = pkigenerator.generate()
@ -525,6 +526,7 @@ def wrap_secret_cli(
"""
engine.repository.process_repositories(site_name, overwrite_existing=True)
config.set_global_enc_keys(site_name)
wrap_secret(
author,
filename,
@ -555,6 +557,8 @@ def wrap_secret_cli(
@SITE_REPOSITORY_ARGUMENT
def genesis_bundle(*, build_dir, validators, site_name):
encryption_key = os.environ.get("PROMENADE_ENCRYPTION_KEY")
config.set_global_enc_keys(site_name)
bundle.build_genesis(
build_dir, encryption_key, validators,
logging.DEBUG == LOG.getEffectiveLevel(), site_name)
@ -575,6 +579,7 @@ def check_pki_certs(site_name, days):
"""Check PKI certificates of a site for expiration."""
engine.repository.process_repositories(site_name, overwrite_existing=True)
config.set_global_enc_keys(site_name)
cert_results = engine.secrets.check_cert_expiry(site_name, duration=days)
@ -654,6 +659,7 @@ def generate():
def generate_passphrases(
*, site_name, save_location, author, interactive, force_cleartext):
engine.repository.process_repositories(site_name)
config.set_global_enc_keys(site_name)
engine.secrets.generate_passphrases(
site_name, save_location, author, interactive, force_cleartext)
@ -682,6 +688,7 @@ def generate_passphrases(
@click.argument('site_name')
def encrypt(*, save_location, author, site_name):
engine.repository.process_repositories(site_name, overwrite_existing=True)
config.set_global_enc_keys(site_name)
if save_location is None:
save_location = config.get_site_repo()
engine.secrets.encrypt(save_location, author, site_name=site_name)
@ -715,6 +722,7 @@ def encrypt(*, save_location, author, site_name):
@click.argument('site_name')
def decrypt(*, path, save_location, overwrite, site_name):
engine.repository.process_repositories(site_name)
config.set_global_enc_keys(site_name)
decrypted = engine.secrets.decrypt(path, site_name=site_name)
if overwrite:

View File

@ -25,6 +25,7 @@ from pegleg.engine import exceptions
from pegleg.engine.generators.passphrase_generator import PassphraseGenerator
from pegleg.engine.util.cryptostring import CryptoString
from pegleg.engine.util import definition
from pegleg.engine.util import encryption
from pegleg.engine.util import files
from pegleg.engine.util.pegleg_managed_document import \
PeglegManagedSecretsDocument as PeglegManagedSecret
@ -275,26 +276,38 @@ def get_global_creds(site_name):
:return: Either the global, or site level - passphrase and salt
"""
log_msg = "Multiple documents containing {} detected. Using latest."
config.set_passphrase()
config.set_salt()
global_passphrase = None
global_salt = None
docs = definition.site_files(site_name)
for doc in docs:
with open(doc, 'r') as f:
results = yaml.safe_load_all(f) # Validate valid YAML.
results = PeglegSecretManagement(
docs=results).get_decrypted_secrets()
for result in results:
if result['schema'] == "deckhand/Passphrase/v1":
if result['metadata']['name'] == 'global_passphrase':
if global_passphrase:
LOG.warn(log_msg.format('global_passphrase'))
global_passphrase = result['data'].encode()
if result['metadata']['name'] == 'global_salt':
if global_salt:
LOG.warn(log_msg.format('global_salt'))
global_salt = result['data'].encode()
docs = definition.documents_for_site(site_name)
for doc in docs:
if doc['schema'] == 'pegleg/PeglegManagedDocument/v1':
try:
name = doc['data']['managedDocument']['metadata']['name']
schema = doc['data']['managedDocument']['schema']
data = doc['data']['managedDocument']['data']
if schema == 'deckhand/Passphrase/v1':
if name == 'global_passphrase':
global_passphrase = encryption.decrypt(
data, config.get_passphrase(), config.get_salt())
elif name == 'global_salt':
global_salt = encryption.decrypt(
data, config.get_passphrase(), config.get_salt())
except KeyError:
continue
else:
try:
name = doc['metadata']['name']
schema = doc['schema']
data = doc['data']
if name == 'global_passphrase':
global_passphrase = data.encode()
elif name == 'global_salt':
global_salt = data.encode()
except KeyError:
continue
# Break out of search if both passphrase and salt are found
if global_passphrase and global_salt:
return (global_passphrase, global_salt)

View File

@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import OrderedDict
from datetime import datetime
import logging
@ -23,6 +24,7 @@ ENCRYPTED = 'encrypted'
GENERATED = 'generated'
STORAGE_POLICY = 'storagePolicy'
METADATA = 'metadata'
DEFAULT_LAYER = 'site'
LOG = logging.getLogger(__name__)
__all__ = ['PeglegManagedSecretsDocument']
@ -71,12 +73,11 @@ class PeglegManagedSecretsDocument(object):
document.
:rtype: dict
"""
layer = secrets_document.get('metadata',
{}).get('layeringDefinition',
{}).get('layer', DEFAULT_LAYER)
layering_definition = OrderedDict(
[
('abstract', False),
# The current requirement only requires site layer.
('layer', 'site')
])
[('abstract', False), ('layer', layer)])
metadata = OrderedDict(
[
('name', secrets_document['metadata']['name']),

View File

@ -49,13 +49,6 @@ class PeglegSecretManagement(object):
config.set_passphrase()
config.set_salt()
# Check if we're working with a specific site, if so determine if the
# global encryption keys already exist. If they don't, set them.
if site_name:
if not (config.get_global_passphrase()
and config.get_global_salt()):
config.set_global_enc_keys(site_name)
if all([file_path, docs]) or not any([file_path, docs]):
raise ValueError(
'Either `file_path` or `docs` must be '

View File

@ -379,6 +379,7 @@ def test_get_global_creds_missing_creds(temp_deployment_files, tmpdir):
# Capture global credentials, verify they are not present and we default
# to site credentials instead.
config.set_global_enc_keys("cicd")
passphrase, salt = secrets.get_global_creds("cicd")
assert passphrase.decode() == 'ytrr89erARAiPE34692iwUMvWqqBvC'
assert salt.decode() == 'MySecretSalt1234567890]['
@ -395,8 +396,7 @@ def test_get_global_creds_missing_pass(temp_deployment_files, tmpdir):
# Create global salt file
with open(os.path.join(str(site_dir), 'secrets', 'passphrases',
'cicd-global-passphrase-encrypted.yaml'),
"w") as outfile:
'cicd-global-salt-encrypted.yaml'), "w") as outfile:
outfile.write(GLOBAL_SALT_DOC)
save_location = tmpdir.mkdir("encrypted_site_files")
@ -405,7 +405,7 @@ def test_get_global_creds_missing_pass(temp_deployment_files, tmpdir):
# Demonstrate that encryption fails when only the global salt or
# only the global passphrase are present among the site files.
with pytest.raises(exceptions.GlobalCredentialsNotFound):
secrets.encrypt(save_location_str, "pytest", "cicd")
config.set_global_enc_keys("cicd")
@mock.patch.dict(
@ -428,6 +428,7 @@ def test_get_global_creds(temp_deployment_files, tmpdir):
save_location_str = str(save_location)
# Encrypt the global passphrase and salt file using site passphrase/salt
config.set_global_enc_keys("cicd")
secrets.encrypt(save_location_str, "pytest", "cicd")
encrypted_files = listdir(save_location_str)
assert len(encrypted_files) > 0
@ -458,6 +459,7 @@ def test_global_encrypt_decrypt(temp_deployment_files, tmpdir):
save_location_str = str(save_location)
# Encrypt the global passphrase and salt file using site passphrase/salt
config.set_global_enc_keys("cicd")
secrets.encrypt(save_location_str, "pytest", "cicd")
# Create and encrypt a global type document

View File

@ -373,7 +373,11 @@ class TestSiteCliActions(BaseCLIActionTest):
self._validate_render_site_action(repo_path)
### Upload tests ###
@mock.patch.dict(
os.environ, {
"PEGLEG_PASSPHRASE": "123456789012345678901234567890",
"PEGLEG_SALT": "MySecretSalt1234567890]["
})
def test_upload_documents_shipyard_using_local_repo_path(self):
"""Validates ShipyardHelper is called with correct arguments."""
# Scenario:
@ -393,6 +397,11 @@ class TestSiteCliActions(BaseCLIActionTest):
assert result.exit_code == 0
mock_obj.assert_called_once()
@mock.patch.dict(
os.environ, {
"PEGLEG_PASSPHRASE": "123456789012345678901234567890",
"PEGLEG_SALT": "MySecretSalt1234567890]["
})
def test_upload_collection_callback_default_to_site_name(self):
"""Validates that collection will default to the given site_name"""
# Scenario: