Add pre-command decrypt option
Adds an option on all site commands to decrypt site files before executing the command's actions. This option will be enabled by default. If the command clones repository contents to a temporary directory, only that temporary data will be decrypted. Change-Id: Ic10c7196592c6d0e1c69a85b265259357ac28169
This commit is contained in:
parent
fff70ad861
commit
3a6e3d7cce
@ -98,9 +98,19 @@ def lint_repo(*, fail_on_missing_sub_src, exclude_lint, warn_lint):
|
||||
@utils.EXTRA_REPOSITORY_OPTION
|
||||
@utils.REPOSITORY_USERNAME_OPTION
|
||||
@utils.REPOSITORY_KEY_OPTION
|
||||
@click.option(
|
||||
'--decrypt/--no-decrypt',
|
||||
'decrypt_repos',
|
||||
default=True,
|
||||
help='Automatically attempts to decrypt repositories before executing '
|
||||
'the command. Decryption will happen after repositories are copied to '
|
||||
'the temporary directory created by pegleg or the user specified '
|
||||
'`-p` directory. This means in most situations, pre-command decrypt '
|
||||
'will not overwrite existing files. For overwriting existing files, '
|
||||
'the full decrypt command should still be used.')
|
||||
def site(
|
||||
*, site_repository, clone_path, extra_repositories, repo_key,
|
||||
repo_username):
|
||||
repo_username, decrypt_repos):
|
||||
"""Group for site-level actions, which include:
|
||||
|
||||
* list: list available sites in a manifests repo
|
||||
@ -115,7 +125,8 @@ def site(
|
||||
repo_key,
|
||||
repo_username,
|
||||
extra_repositories or [],
|
||||
run_umask=True)
|
||||
run_umask=True,
|
||||
decrypt_repos=decrypt_repos)
|
||||
|
||||
|
||||
@site.command(help='Output complete config for one site')
|
||||
|
@ -16,7 +16,9 @@ import logging
|
||||
|
||||
import click
|
||||
|
||||
from pegleg import config
|
||||
from pegleg import engine
|
||||
from pegleg import pegleg_main
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -40,9 +42,15 @@ def collection_default_callback(ctx, param, value):
|
||||
return value
|
||||
|
||||
|
||||
def decrypt_repos(site_name):
|
||||
repo_list = config.all_repos()
|
||||
for repo in repo_list:
|
||||
pegleg_main.run_decrypt(True, repo, None, site_name)
|
||||
|
||||
|
||||
# Arguments #
|
||||
SITE_REPOSITORY_ARGUMENT = click.argument(
|
||||
'site_name', callback=process_repositories_callback)
|
||||
'site_name', callback=process_repositories_callback, is_eager=True)
|
||||
|
||||
# Options #
|
||||
ALLOW_MISSING_SUBSTITUTIONS_OPTION = click.option(
|
||||
|
@ -38,7 +38,8 @@ except NameError:
|
||||
'global_salt': None,
|
||||
'salt_min_length': 24,
|
||||
'passphrase_min_length': 24,
|
||||
'default_umask': 0o027
|
||||
'default_umask': 0o027,
|
||||
'decrypt_repos': False
|
||||
}
|
||||
|
||||
|
||||
@ -214,3 +215,11 @@ def get_global_passphrase():
|
||||
def get_global_salt():
|
||||
"""Get the global salt for encryption and decryption."""
|
||||
return GLOBAL_CONTEXT['global_salt']
|
||||
|
||||
|
||||
def set_decrypt_repos(decrypt_repos=False):
|
||||
GLOBAL_CONTEXT['decrypt_repos'] = decrypt_repos
|
||||
|
||||
|
||||
def get_decrypt_repos():
|
||||
return GLOBAL_CONTEXT['decrypt_repos']
|
||||
|
@ -49,7 +49,8 @@ def run_config(
|
||||
repo_key,
|
||||
repo_username,
|
||||
extra_repositories,
|
||||
run_umask=True):
|
||||
run_umask=True,
|
||||
decrypt_repos=True):
|
||||
"""Initializes pegleg configuration data
|
||||
|
||||
:param site_repository: path or URL for site repository
|
||||
@ -60,6 +61,7 @@ def run_config(
|
||||
:param extra_repositories: list of extra repositories to read in documents
|
||||
from, specified as "type=REPO_URL/PATH"
|
||||
:param run_umask: if True, runs set_umask for os file output
|
||||
:param decrypt_repos: if True, decrypts repos before executing command
|
||||
:return:
|
||||
"""
|
||||
config.set_site_repo(site_repository)
|
||||
@ -70,6 +72,7 @@ def run_config(
|
||||
config.set_repo_username(repo_username)
|
||||
if run_umask:
|
||||
config.set_umask()
|
||||
config.set_decrypt_repos(decrypt_repos)
|
||||
|
||||
|
||||
def _run_lint_helper(
|
||||
@ -86,6 +89,20 @@ def _run_lint_helper(
|
||||
return warns
|
||||
|
||||
|
||||
def _run_precommand_decrypt(site_name):
|
||||
if config.get_decrypt_repos():
|
||||
LOG.info('Executing pre-command repository decryption...')
|
||||
repo_list = config.all_repos()
|
||||
for repo in repo_list:
|
||||
secrets_path = os.path.join(
|
||||
repo.rstrip(os.path.sep), 'site', site_name, 'secrets')
|
||||
if os.path.exists(secrets_path):
|
||||
LOG.info('Decrypting %s', secrets_path)
|
||||
run_decrypt(True, secrets_path, None, site_name)
|
||||
else:
|
||||
LOG.debug('Skipping pre-command repository decryption.')
|
||||
|
||||
|
||||
def run_lint(exclude_lint, fail_on_missing_sub_src, warn_lint):
|
||||
"""Runs linting on a repository
|
||||
|
||||
@ -116,6 +133,7 @@ def run_collect(exclude_lint, save_location, site_name, validate, warn_lint):
|
||||
:param warn_lint: output warnings for specified rules
|
||||
:return:
|
||||
"""
|
||||
_run_precommand_decrypt(site_name)
|
||||
if validate:
|
||||
# Lint the primary repo prior to document collection.
|
||||
_run_lint_helper(
|
||||
@ -154,6 +172,7 @@ def run_render(output_stream, site_name, validate):
|
||||
:param validate: if True, validate documents using schema validation
|
||||
:return:
|
||||
"""
|
||||
_run_precommand_decrypt(site_name)
|
||||
engine.site.render(site_name, output_stream, validate)
|
||||
|
||||
|
||||
@ -167,6 +186,7 @@ def run_lint_site(exclude_lint, fail_on_missing_sub_src, site_name, warn_lint):
|
||||
:param warn_lint: output warnings for specified rules
|
||||
:return:
|
||||
"""
|
||||
_run_precommand_decrypt(site_name)
|
||||
return _run_lint_helper(
|
||||
fail_on_missing_sub_src=fail_on_missing_sub_src,
|
||||
exclude_lint=exclude_lint,
|
||||
@ -195,6 +215,7 @@ def run_upload(
|
||||
:param site_name: site name to process
|
||||
:return: response from shipyard instance
|
||||
"""
|
||||
_run_precommand_decrypt(site_name)
|
||||
if not ctx.obj:
|
||||
ctx.obj = {}
|
||||
# Build API parameters required by Shipyard API Client.
|
||||
@ -237,6 +258,7 @@ def run_generate_pki(
|
||||
:param save_location: directory to store the generated site certificates in
|
||||
:return: list of paths written to
|
||||
"""
|
||||
_run_precommand_decrypt(site_name)
|
||||
engine.repository.process_repositories(site_name, overwrite_existing=True)
|
||||
pkigenerator = catalog.pki_generator.PKIGenerator(
|
||||
site_name,
|
||||
@ -264,7 +286,6 @@ def run_wrap_secret(
|
||||
:param site_name: site name to process
|
||||
:return:
|
||||
"""
|
||||
engine.repository.process_repositories(site_name, overwrite_existing=True)
|
||||
config.set_global_enc_keys(site_name)
|
||||
wrap_secret(
|
||||
author,
|
||||
@ -285,6 +306,7 @@ def run_genesis_bundle(build_dir, site_name, validators):
|
||||
:param validators: if True, runs validation scripts on genesis bundle
|
||||
:return:
|
||||
"""
|
||||
_run_precommand_decrypt(site_name)
|
||||
encryption_key = os.environ.get("PROMENADE_ENCRYPTION_KEY")
|
||||
config.set_global_enc_keys(site_name)
|
||||
bundle.build_genesis(
|
||||
@ -299,7 +321,7 @@ def run_check_pki_certs(days, site_name):
|
||||
:param site_name: site name to process
|
||||
:return:
|
||||
"""
|
||||
engine.repository.process_repositories(site_name, overwrite_existing=True)
|
||||
_run_precommand_decrypt(site_name)
|
||||
config.set_global_enc_keys(site_name)
|
||||
expiring_certs_exist, cert_results = engine.secrets.check_cert_expiry(
|
||||
site_name, duration=days)
|
||||
@ -335,7 +357,7 @@ def run_generate_passphrases(
|
||||
discovered catalogs
|
||||
:return:
|
||||
"""
|
||||
engine.repository.process_repositories(site_name)
|
||||
_run_precommand_decrypt(site_name)
|
||||
config.set_global_enc_keys(site_name)
|
||||
engine.secrets.generate_passphrases(
|
||||
site_name,
|
||||
@ -356,7 +378,6 @@ def run_encrypt(author, save_location, site_name):
|
||||
:param site_name: site name to process
|
||||
:return:
|
||||
"""
|
||||
engine.repository.process_repositories(site_name, overwrite_existing=True)
|
||||
config.set_global_enc_keys(site_name)
|
||||
if save_location is None:
|
||||
save_location = config.get_site_repo()
|
||||
@ -375,7 +396,6 @@ def run_decrypt(overwrite, path, save_location, site_name):
|
||||
:rtype: list
|
||||
"""
|
||||
decrypted_data = []
|
||||
engine.repository.process_repositories(site_name)
|
||||
config.set_global_enc_keys(site_name)
|
||||
decrypted = engine.secrets.decrypt(path, site_name=site_name)
|
||||
if overwrite:
|
||||
|
@ -11,14 +11,16 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import glob
|
||||
import os
|
||||
import subprocess
|
||||
from unittest import mock
|
||||
|
||||
from click.testing import CliRunner
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
from pegleg import pegleg_main
|
||||
from pegleg.cli import commands
|
||||
from pegleg.engine import errorcodes
|
||||
from pegleg.engine.catalog import pki_utility
|
||||
@ -97,7 +99,8 @@ class TestSiteCLIOptions(BaseCLIActionTest):
|
||||
|
||||
# Note that the -p option is used to specify the clone_folder
|
||||
site_list = self.runner.invoke(
|
||||
commands.site, ['-p', tmpdir, '-r', repo_url, 'list'])
|
||||
commands.site,
|
||||
['--no-decrypt', '-p', tmpdir, '-r', repo_url, 'list'])
|
||||
|
||||
assert site_list.exit_code == 0
|
||||
# Verify that the repo was cloned into the clone_path
|
||||
@ -118,7 +121,8 @@ class TestSiteCLIOptions(BaseCLIActionTest):
|
||||
|
||||
# Note that the -p option is used to specify the clone_folder
|
||||
site_list = self.runner.invoke(
|
||||
commands.site, ['-p', tmpdir, '-r', repo_path, 'list'])
|
||||
commands.site,
|
||||
['--no-decrypt', '-p', tmpdir, '-r', repo_path, 'list'])
|
||||
|
||||
assert site_list.exit_code == 0
|
||||
# Verify that passing in clone_path when using local repo has no effect
|
||||
@ -146,14 +150,16 @@ class TestSiteCLIOptionsNegative(BaseCLIActionTest):
|
||||
|
||||
# Note that the -p option is used to specify the clone_folder
|
||||
site_list = self.runner.invoke(
|
||||
commands.site, ['-p', tmpdir, '-r', repo_url, 'list'])
|
||||
commands.site,
|
||||
['--no-decrypt', '-p', tmpdir, '-r', repo_url, 'list'])
|
||||
|
||||
assert git.is_repository(os.path.join(tmpdir, self.repo_name))
|
||||
|
||||
# Run site list for a second time to validate that the repo can't be
|
||||
# cloned twice in the same clone_path
|
||||
site_list = self.runner.invoke(
|
||||
commands.site, ['-p', tmpdir, '-r', repo_url, 'list'])
|
||||
commands.site,
|
||||
['--no-decrypt', '-p', tmpdir, '-r', repo_url, 'list'])
|
||||
|
||||
assert site_list.exit_code == 1
|
||||
assert 'File exists' in site_list.output
|
||||
@ -167,8 +173,8 @@ class TestSiteCliActions(BaseCLIActionTest):
|
||||
def _validate_collect_site_action(self, repo_path_or_url, save_location):
|
||||
result = self.runner.invoke(
|
||||
commands.site, [
|
||||
'-r', repo_path_or_url, 'collect', self.site_name, '-s',
|
||||
save_location
|
||||
'--no-decrypt', '-r', repo_path_or_url, 'collect',
|
||||
self.site_name, '-s', save_location
|
||||
])
|
||||
|
||||
collected_files = os.listdir(save_location)
|
||||
@ -219,7 +225,9 @@ class TestSiteCliActions(BaseCLIActionTest):
|
||||
def _test_lint_site_action(self, repo_path_or_url, exclude=True):
|
||||
flag = '-x' if exclude else '-w'
|
||||
|
||||
lint_command = ['-r', repo_path_or_url, 'lint', self.site_name]
|
||||
lint_command = [
|
||||
'--no-decrypt', '-r', repo_path_or_url, 'lint', self.site_name
|
||||
]
|
||||
exclude_lint_command = [
|
||||
flag, errorcodes.SCHEMA_STORAGE_POLICY_MISMATCH_FLAG, flag,
|
||||
errorcodes.SECRET_NOT_ENCRYPTED_POLICY
|
||||
@ -275,7 +283,10 @@ class TestSiteCliActions(BaseCLIActionTest):
|
||||
def _validate_list_site_action(self, repo_path_or_url, tmpdir):
|
||||
mock_output = os.path.join(tmpdir, 'output')
|
||||
result = self.runner.invoke(
|
||||
commands.site, ['-r', repo_path_or_url, 'list', '-o', mock_output])
|
||||
commands.site, [
|
||||
'--no-decrypt', '-r', repo_path_or_url, 'list', '-o',
|
||||
mock_output
|
||||
])
|
||||
|
||||
assert result.exit_code == 0, result.output
|
||||
with open(mock_output, 'r') as f:
|
||||
@ -309,8 +320,8 @@ class TestSiteCliActions(BaseCLIActionTest):
|
||||
mock_output = os.path.join(tmpdir, 'output')
|
||||
result = self.runner.invoke(
|
||||
commands.site, [
|
||||
'-r', repo_path_or_url, 'show', self.site_name, '-o',
|
||||
mock_output
|
||||
'--no-decrypt', '-r', repo_path_or_url, 'show', self.site_name,
|
||||
'-o', mock_output
|
||||
])
|
||||
|
||||
assert result.exit_code == 0, result.output
|
||||
@ -340,7 +351,9 @@ class TestSiteCliActions(BaseCLIActionTest):
|
||||
### Render tests ###
|
||||
|
||||
def _validate_render_site_action(self, repo_path_or_url):
|
||||
render_command = ['-r', repo_path_or_url, 'render', self.site_name]
|
||||
render_command = [
|
||||
'--no-decrypt', '-r', repo_path_or_url, 'render', self.site_name
|
||||
]
|
||||
|
||||
with mock.patch('pegleg.engine.site.yaml') as mock_yaml:
|
||||
with mock.patch(
|
||||
@ -390,8 +403,8 @@ class TestSiteCliActions(BaseCLIActionTest):
|
||||
with mock.patch('pegleg.pegleg_main.ShipyardHelper') as mock_obj:
|
||||
result = self.runner.invoke(
|
||||
commands.site, [
|
||||
'-r', repo_path, 'upload', self.site_name, '--collection',
|
||||
'collection'
|
||||
'--no-decrypt', '-r', repo_path, 'upload', self.site_name,
|
||||
'--collection', 'collection'
|
||||
])
|
||||
|
||||
assert result.exit_code == 0
|
||||
@ -413,7 +426,8 @@ class TestSiteCliActions(BaseCLIActionTest):
|
||||
|
||||
with mock.patch('pegleg.pegleg_main.ShipyardHelper') as mock_obj:
|
||||
result = self.runner.invoke(
|
||||
commands.site, ['-r', repo_path, 'upload', self.site_name])
|
||||
commands.site,
|
||||
['--no-decrypt', '-r', repo_path, 'upload', self.site_name])
|
||||
assert result.exit_code == 0
|
||||
mock_obj.assert_called_once()
|
||||
|
||||
@ -527,7 +541,7 @@ class TestSiteSecretsActions(BaseCLIActionTest):
|
||||
secrets_opts = ['secrets', 'generate', 'certificates', self.site_name]
|
||||
|
||||
result = self.runner.invoke(
|
||||
commands.site, ['-r', repo_url] + secrets_opts)
|
||||
commands.site, ['--no-decrypt', '-r', repo_url] + secrets_opts)
|
||||
self._validate_generate_pki_action(result)
|
||||
|
||||
@pytest.mark.skipif(
|
||||
@ -543,7 +557,7 @@ class TestSiteSecretsActions(BaseCLIActionTest):
|
||||
secrets_opts = ['secrets', 'generate', 'certificates', self.site_name]
|
||||
|
||||
result = self.runner.invoke(
|
||||
commands.site, ['-r', repo_path] + secrets_opts)
|
||||
commands.site, ['--no-decrypt', '-r', repo_path] + secrets_opts)
|
||||
self._validate_generate_pki_action(result)
|
||||
|
||||
@pytest.mark.skipif(
|
||||
@ -574,7 +588,7 @@ class TestSiteSecretsActions(BaseCLIActionTest):
|
||||
|
||||
secrets_opts = ['secrets', 'encrypt', '-a', 'test', self.site_name]
|
||||
result = self.runner.invoke(
|
||||
commands.site, ['-r', repo_path] + secrets_opts)
|
||||
commands.site, ['--no-decrypt', '-r', repo_path] + secrets_opts)
|
||||
|
||||
assert result.exit_code == 0
|
||||
|
||||
@ -590,7 +604,7 @@ class TestSiteSecretsActions(BaseCLIActionTest):
|
||||
'secrets', 'decrypt', '--path', file_path, self.site_name
|
||||
]
|
||||
result = self.runner.invoke(
|
||||
commands.site, ['-r', repo_path] + secrets_opts)
|
||||
commands.site, ['--no-decrypt', '-r', repo_path] + secrets_opts)
|
||||
assert result.exit_code == 0, result.output
|
||||
|
||||
@pytest.mark.skipif(
|
||||
@ -600,7 +614,7 @@ class TestSiteSecretsActions(BaseCLIActionTest):
|
||||
repo_path = self.treasuremap_path
|
||||
secrets_opts = ['secrets', 'check-pki-certs', self.site_name]
|
||||
result = self.runner.invoke(
|
||||
commands.site, ['-r', repo_path] + secrets_opts)
|
||||
commands.site, ['--no-decrypt', '-r', repo_path] + secrets_opts)
|
||||
assert result.exit_code == 1, result.output
|
||||
|
||||
@pytest.mark.skipif(
|
||||
@ -610,7 +624,7 @@ class TestSiteSecretsActions(BaseCLIActionTest):
|
||||
repo_path = self.treasuremap_path
|
||||
secrets_opts = ['secrets', 'check-pki-certs', 'airsloop']
|
||||
result = self.runner.invoke(
|
||||
commands.site, ['-r', repo_path] + secrets_opts)
|
||||
commands.site, ['--no-decrypt', '-r', repo_path] + secrets_opts)
|
||||
assert result.exit_code == 0, result.output
|
||||
|
||||
@mock.patch.dict(
|
||||
@ -638,7 +652,7 @@ class TestSiteSecretsActions(BaseCLIActionTest):
|
||||
"--no-encrypt", self.site_name
|
||||
]
|
||||
result = self.runner.invoke(
|
||||
commands.site, ["-r", repo_path] + secrets_opts)
|
||||
commands.site, ['--no-decrypt', "-r", repo_path] + secrets_opts)
|
||||
assert result.exit_code == 0
|
||||
|
||||
with open(output_path, "r") as output_fi:
|
||||
@ -660,7 +674,7 @@ class TestSiteSecretsActions(BaseCLIActionTest):
|
||||
"test-certificate", "-l", "site", self.site_name
|
||||
]
|
||||
result = self.runner.invoke(
|
||||
commands.site, ["-r", repo_path] + secrets_opts)
|
||||
commands.site, ['--no-decrypt', "-r", repo_path] + secrets_opts)
|
||||
assert result.exit_code == 0
|
||||
|
||||
with open(output_path, "r") as output_fi:
|
||||
@ -720,7 +734,10 @@ class TestSiteCliActionsWithSubdirectory(BaseCLIActionTest):
|
||||
def _validate_list_site_action(self, repo_path_or_url, tmpdir):
|
||||
mock_output = os.path.join(tmpdir, 'output')
|
||||
result = self.runner.invoke(
|
||||
commands.site, ['-r', repo_path_or_url, 'list', '-o', mock_output])
|
||||
commands.site, [
|
||||
'--no-decrypt', '-r', repo_path_or_url, 'list', '-o',
|
||||
mock_output
|
||||
])
|
||||
|
||||
with open(mock_output, 'r') as f:
|
||||
table_output = f.read()
|
||||
@ -758,3 +775,218 @@ class TestSiteCliActionsWithSubdirectory(BaseCLIActionTest):
|
||||
repo_path = os.path.join(_repo_path, 'deployment_files')
|
||||
|
||||
self._validate_list_site_action(repo_path, tmpdir)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures('monkeypatch')
|
||||
class TestCliSiteSubcommandsWithDecryptOption(BaseCLIActionTest):
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
super(TestCliSiteSubcommandsWithDecryptOption, cls).setup_class()
|
||||
cls.runner = CliRunner(
|
||||
env={
|
||||
"PEGLEG_PASSPHRASE": 'ytrr89erARAiPE34692iwUMvWqqBvC',
|
||||
"PEGLEG_SALT": "MySecretSalt1234567890][",
|
||||
"PROMENADE_ENCRYPTION_KEY": "test"
|
||||
})
|
||||
for file in glob.iglob(os.path.join(cls.treasuremap_path, 'site',
|
||||
'seaworthy', 'secrets', '**',
|
||||
'*.yaml'), recursive=True):
|
||||
args = [
|
||||
'sed', '-i',
|
||||
's/storagePolicy: cleartext/storagePolicy: encrypted/g', file
|
||||
]
|
||||
sed_output = subprocess.check_output(args, shell=False)
|
||||
assert not sed_output
|
||||
|
||||
@mock.patch.dict(
|
||||
os.environ, {
|
||||
"PEGLEG_PASSPHRASE": 'ytrr89erARAiPE34692iwUMvWqqBvC',
|
||||
"PEGLEG_SALT": "MySecretSalt1234567890]["
|
||||
})
|
||||
def setup(self):
|
||||
pegleg_main.run_config(
|
||||
self.treasuremap_path, None, None, None, [], True, False)
|
||||
pegleg_main.run_encrypt('zuul-tester', None, self.site_name)
|
||||
|
||||
@staticmethod
|
||||
def _validate_no_files_encrypted(path):
|
||||
for file in glob.iglob(os.path.join(path, '**', '*.yaml'),
|
||||
recursive=True):
|
||||
with open(file, 'r') as f:
|
||||
data = f.read()
|
||||
if 'pegleg/PeglegManagedDocument/v1' in data:
|
||||
return False
|
||||
return True
|
||||
|
||||
def test_collect_using_decrypt_option(self, tmpdir):
|
||||
"""Validates collect action using a path to a local repo."""
|
||||
# Scenario:
|
||||
#
|
||||
# 1) Create temporary save location
|
||||
# 2) Collect into save location (should skip clone repo)
|
||||
# 3) Check that expected file name is there
|
||||
|
||||
repo_path = self.treasuremap_path
|
||||
result = self.runner.invoke(
|
||||
commands.site, [
|
||||
'--decrypt', '-r', repo_path, 'collect', self.site_name, '-s',
|
||||
tmpdir
|
||||
])
|
||||
|
||||
collected_files = os.listdir(tmpdir)
|
||||
|
||||
assert result.exit_code == 0, result.output
|
||||
assert len(collected_files) == 1
|
||||
# Validates that site manifests collected from cloned repositories
|
||||
# are written out to sensibly named files like airship-treasuremap.yaml
|
||||
assert collected_files[0] == ("%s.yaml" % self.repo_name)
|
||||
assert self._validate_no_files_encrypted(tmpdir)
|
||||
|
||||
def test_render_site_using_decrypt_option(self, tmpdir):
|
||||
"""Validates render action using local repo path."""
|
||||
# Scenario:
|
||||
#
|
||||
# 1) Mock out Deckhand render (so we can ignore P005 issues)
|
||||
# 2) Render site (should skip clone repo)
|
||||
|
||||
repo_path = self.treasuremap_path
|
||||
render_command = [
|
||||
'--decrypt', '-p', tmpdir, '-r', repo_path, 'render',
|
||||
self.site_name
|
||||
]
|
||||
|
||||
with mock.patch('pegleg.engine.site.yaml') as mock_yaml:
|
||||
with mock.patch(
|
||||
'pegleg.engine.site.util.deckhand') as mock_deckhand:
|
||||
mock_deckhand.deckhand_render.return_value = ([], [])
|
||||
result = self.runner.invoke(commands.site, render_command)
|
||||
|
||||
assert result.exit_code == 0
|
||||
mock_yaml.dump_all.assert_called_once()
|
||||
assert self._validate_no_files_encrypted(
|
||||
os.path.join(
|
||||
tmpdir, 'treasuremap.git', 'site', 'seaworthy', 'secrets'))
|
||||
|
||||
def test_lint_site_using_decrypt_option(self, tmpdir):
|
||||
"""Validates site lint action using local repo path."""
|
||||
# Scenario:
|
||||
#
|
||||
# 1) Mock out Deckhand render (so we can ignore P005 issues)
|
||||
# 2) Lint site with warn flags (should skip clone repo)
|
||||
|
||||
repo_path = self.treasuremap_path
|
||||
|
||||
lint_command = [
|
||||
'--decrypt', '-p', tmpdir, '-r', repo_path, 'lint', self.site_name
|
||||
]
|
||||
exclude_lint_command = [
|
||||
'-w', errorcodes.SCHEMA_STORAGE_POLICY_MISMATCH_FLAG, '-w',
|
||||
errorcodes.SECRET_NOT_ENCRYPTED_POLICY
|
||||
]
|
||||
|
||||
with mock.patch('pegleg.engine.site.util.deckhand') as mock_deckhand:
|
||||
mock_deckhand.deckhand_render.return_value = ([], [])
|
||||
result = self.runner.invoke(
|
||||
commands.site, lint_command + exclude_lint_command)
|
||||
|
||||
assert result.exit_code == 0, result.output
|
||||
assert self._validate_no_files_encrypted(
|
||||
os.path.join(
|
||||
tmpdir, 'treasuremap.git', 'site', 'seaworthy', 'secrets'))
|
||||
|
||||
@mock.patch.dict(
|
||||
os.environ, {
|
||||
"PEGLEG_PASSPHRASE": "123456789012345678901234567890",
|
||||
"PEGLEG_SALT": "MySecretSalt1234567890]["
|
||||
})
|
||||
def test_upload_collection_callback_default_to_site_name(self, tmpdir):
|
||||
"""Validates that collection will default to the given site_name"""
|
||||
# Scenario:
|
||||
#
|
||||
# 1) Mock out ShipyardHelper
|
||||
# 2) Check that ShipyardHelper was called with collection set to
|
||||
# site_name
|
||||
repo_path = self.treasuremap_path
|
||||
|
||||
with mock.patch('pegleg.pegleg_main.ShipyardHelper') as mock_obj:
|
||||
result = self.runner.invoke(
|
||||
commands.site, [
|
||||
'--decrypt', '-p', tmpdir, '-r', repo_path, 'upload',
|
||||
self.site_name
|
||||
])
|
||||
assert result.exit_code == 0
|
||||
mock_obj.assert_called_once()
|
||||
assert self._validate_no_files_encrypted(
|
||||
os.path.join(
|
||||
tmpdir, 'treasuremap.git', 'site', 'seaworthy', 'secrets'))
|
||||
|
||||
@pytest.mark.skipif(
|
||||
not pki_utility.PKIUtility.cfssl_exists(),
|
||||
reason='cfssl must be installed to execute these tests')
|
||||
def test_site_secrets_generate_pki_using_decrypt_option(self, tmpdir):
|
||||
"""Validates ``generate certificates`` action using local repo path."""
|
||||
# Scenario:
|
||||
#
|
||||
# 1) Generate PKI using local repo path
|
||||
|
||||
repo_path = self.treasuremap_path
|
||||
secrets_opts = ['secrets', 'generate', 'certificates', self.site_name]
|
||||
|
||||
result = self.runner.invoke(
|
||||
commands.site,
|
||||
['--decrypt', '-p', tmpdir, '-r', repo_path] + secrets_opts)
|
||||
assert result.exit_code == 0
|
||||
|
||||
generated_files = []
|
||||
output_lines = result.output.split("\n")
|
||||
for line in output_lines:
|
||||
if self.repo_name in line:
|
||||
generated_files.append(line)
|
||||
|
||||
assert len(generated_files), 'No secrets were generated'
|
||||
for generated_file in generated_files:
|
||||
with open(generated_file, 'r') as f:
|
||||
result = yaml.safe_load_all(f) # Validate valid YAML.
|
||||
assert list(result), "%s file is empty" % generated_file
|
||||
assert self._validate_no_files_encrypted(
|
||||
os.path.join(
|
||||
tmpdir, 'treasuremap.git', 'site', 'seaworthy', 'secrets'))
|
||||
|
||||
@pytest.mark.skipif(
|
||||
not pki_utility.PKIUtility.cfssl_exists(),
|
||||
reason='cfssl must be installed to execute these tests')
|
||||
def test_check_pki_certs_expired_using_decrypt_option(self):
|
||||
repo_path = self.treasuremap_path
|
||||
secrets_opts = ['secrets', 'check-pki-certs', self.site_name]
|
||||
result = self.runner.invoke(
|
||||
commands.site, ['--decrypt', '-r', repo_path] + secrets_opts)
|
||||
assert result.exit_code == 1, result.output
|
||||
assert self._validate_no_files_encrypted(
|
||||
os.path.join(repo_path, 'site', 'seaworthy', 'secrets'))
|
||||
|
||||
def test_genesis_bundle_using_decrypt_option(self, tmpdir):
|
||||
repo_path = self.treasuremap_path
|
||||
args = [
|
||||
'--decrypt', '-p', tmpdir, '-r', repo_path, 'genesis_bundle', '-b',
|
||||
tmpdir, self.site_name
|
||||
]
|
||||
with mock.patch(
|
||||
'pegleg.pegleg_main.bundle.build_genesis') as mock_build:
|
||||
result = self.runner.invoke(commands.site, args)
|
||||
assert result.exit_code == 0
|
||||
assert self._validate_no_files_encrypted(tmpdir)
|
||||
mock_build.assert_called_once()
|
||||
|
||||
def test_generate_passphrases_using_decrypt_option(self, tmpdir):
|
||||
repo_path = self.treasuremap_path
|
||||
args = [
|
||||
'--decrypt', '-p', tmpdir, '-r', repo_path, 'secrets', 'generate',
|
||||
'passphrases', '-s', repo_path, '-a', 'zuul_tester', self.site_name
|
||||
]
|
||||
with mock.patch(
|
||||
'pegleg.pegleg_main.engine.secrets.generate_passphrases'
|
||||
) as mock_generator:
|
||||
result = self.runner.invoke(commands.site, args)
|
||||
assert result.exit_code == 0
|
||||
assert self._validate_no_files_encrypted(tmpdir)
|
||||
mock_generator.assert_called_once()
|
||||
|
Loading…
Reference in New Issue
Block a user