Implement default umask for 640 file permissions

Some secrets are being created with undesirable permissions. Upon
inspection it was noticed that in general Pegleg is creating files,
then changing permissions after the fact. This leads to a small
window where the permissions on a file are overly permissive.

This patchset:
1. Sets default umask of 0o027 (640 permissions for files)
2. Explicitly adds the open flag ('r', 'w' etc.) to all open() calls.
3. Replaces sys.stdout.write calls with click.echo() calls to be more
   in line with the rest of the project.
4. Re-orders methods that write so that data is always first, and the
   path is always second.
5. Updates unit tests.
6. Adds unit tests for testing directory and file permissions.
7. Minor style changes.

Change-Id: I0c154aa311ea371940fd24b0aabf58fffaf1d231
This commit is contained in:
HUGHES, ALEXANDER (ah8742) 2019-06-19 10:34:24 -05:00 committed by Alexander Hughes
parent 6c07c70dec
commit a8620cfd8d
16 changed files with 531 additions and 524 deletions

View File

@ -15,7 +15,6 @@
import functools
import logging
import os
import sys
import click
@ -53,7 +52,7 @@ MAIN_REPOSITORY_OPTION = click.option(
'site_repository',
required=True,
help='Path or URL to the primary repository (containing '
'site_definition.yaml) repo.')
'site_definition.yaml) repo.')
EXTRA_REPOSITORY_OPTION = click.option(
'-e',
@ -61,42 +60,42 @@ EXTRA_REPOSITORY_OPTION = click.option(
'extra_repositories',
multiple=True,
help='Path or URL of additional repositories. These should be named per '
'the site-definition file, e.g. -e global=/opt/global -e '
'secrets=/opt/secrets. By default, the revision specified in the '
'site-definition for the site will be leveraged but can be '
'overridden using -e global=/opt/global@revision.')
'the site-definition file, e.g. -e global=/opt/global -e '
'secrets=/opt/secrets. By default, the revision specified in the '
'site-definition for the site will be leveraged but can be '
'overridden using -e global=/opt/global@revision.')
REPOSITORY_KEY_OPTION = click.option(
'-k',
'--repo-key',
'repo_key',
help='The SSH public key to use when cloning remote authenticated '
'repositories.')
'repositories.')
REPOSITORY_USERNAME_OPTION = click.option(
'-u',
'--repo-username',
'repo_username',
help='The SSH username to use when cloning remote authenticated '
'repositories specified in the site-definition file. Any '
'occurrences of REPO_USERNAME will be replaced with this '
'value.\n'
'Use only if REPO_USERNAME appears in a repo URL.')
'repositories specified in the site-definition file. Any '
'occurrences of REPO_USERNAME will be replaced with this '
'value.\n'
'Use only if REPO_USERNAME appears in a repo URL.')
REPOSITORY_CLONE_PATH_OPTION = click.option(
'-p',
'--clone-path',
'clone_path',
help='The path where the repo will be cloned. By default the repo will be '
'cloned to the /tmp path. If this option is '
'included and the repo already '
'exists, then the repo will not be cloned again and the '
'user must specify a new clone path or pass in the local copy '
'of the repository as the site repository. Suppose the repo '
'name is airship/treasuremap and the clone path is '
'/tmp/mypath then the following directory is '
'created /tmp/mypath/airship/treasuremap '
'which will contain the contents of the repo')
'cloned to the /tmp path. If this option is '
'included and the repo already '
'exists, then the repo will not be cloned again and the '
'user must specify a new clone path or pass in the local copy '
'of the repository as the site repository. Suppose the repo '
'name is airship/treasuremap and the clone path is '
'/tmp/mypath then the following directory is '
'created /tmp/mypath/airship/treasuremap '
'which will contain the contents of the repo')
ALLOW_MISSING_SUBSTITUTIONS_OPTION = click.option(
'-f',
@ -113,7 +112,7 @@ EXCLUDE_LINT_OPTION = click.option(
'exclude_lint',
multiple=True,
help='Excludes specified linting checks. Warnings will still be issued. '
'-w takes priority over -x.')
'-w takes priority over -x.')
WARN_LINT_OPTION = click.option(
'-w',
@ -127,12 +126,11 @@ SITE_REPOSITORY_ARGUMENT = click.argument(
@click.group(context_settings=CONTEXT_SETTINGS)
@click.option(
'-v',
'--verbose',
is_flag=True,
default=False,
help='Enable debug logging')
@click.option('-v',
'--verbose',
is_flag=True,
default=False,
help='Enable debug logging')
def main(*, verbose):
"""Main CLI meta-group, which includes the following groups:
@ -166,6 +164,7 @@ def repo(*, site_repository, clone_path, repo_key, repo_username):
config.set_clone_path(clone_path)
config.set_repo_key(repo_key)
config.set_repo_username(repo_username)
config.set_umask()
def _lint_helper(*,
@ -178,10 +177,9 @@ def _lint_helper(*,
func = functools.partial(engine.lint.site, site_name=site_name)
else:
func = engine.lint.full
warns = func(
fail_on_missing_sub_src=fail_on_missing_sub_src,
exclude_lint=exclude_lint,
warn_lint=warn_lint)
warns = func(fail_on_missing_sub_src=fail_on_missing_sub_src,
exclude_lint=exclude_lint,
warn_lint=warn_lint)
if warns:
click.echo("Linting passed, but produced some warnings.")
for w in warns:
@ -196,10 +194,9 @@ def lint_repo(*, fail_on_missing_sub_src, exclude_lint, warn_lint):
"""Lint all sites using checks defined in :mod:`pegleg.engine.errorcodes`.
"""
engine.repository.process_site_repository(update_config=True)
_lint_helper(
fail_on_missing_sub_src=fail_on_missing_sub_src,
exclude_lint=exclude_lint,
warn_lint=warn_lint)
_lint_helper(fail_on_missing_sub_src=fail_on_missing_sub_src,
exclude_lint=exclude_lint,
warn_lint=warn_lint)
@main.group(help='Commands related to sites')
@ -224,15 +221,15 @@ def site(*, site_repository, clone_path, extra_repositories, repo_key,
config.set_extra_repo_overrides(extra_repositories or [])
config.set_repo_key(repo_key)
config.set_repo_username(repo_username)
config.set_umask()
@site.command(help='Output complete config for one site')
@click.option(
'-s',
'--save-location',
'save_location',
help='Directory to output the complete site definition. Created '
'automatically if it does not already exist.')
@click.option('-s',
'--save-location',
'save_location',
help='Directory to output the complete site definition. Created '
'automatically if it does not already exist.')
@click.option(
'--validate/--no-validate',
'validate',
@ -248,13 +245,12 @@ def site(*, site_repository, clone_path, extra_repositories, repo_key,
'exclude_lint',
multiple=True,
help='Excludes specified linting checks. Warnings will still be issued. '
'-w takes priority over -x.')
@click.option(
'-w',
'--warn',
'warn_lint',
multiple=True,
help='Warn if linting check fails. -w takes priority over -x.')
'-w takes priority over -x.')
@click.option('-w',
'--warn',
'warn_lint',
multiple=True,
help='Warn if linting check fails. -w takes priority over -x.')
@SITE_REPOSITORY_ARGUMENT
def collect(*, save_location, validate, exclude_lint, warn_lint, site_name):
"""Collects documents into a single site-definition.yaml file, which
@ -269,51 +265,29 @@ def collect(*, save_location, validate, exclude_lint, warn_lint, site_name):
"""
if validate:
# Lint the primary repo prior to document collection.
_lint_helper(
site_name=site_name,
fail_on_missing_sub_src=True,
exclude_lint=exclude_lint,
warn_lint=warn_lint)
_lint_helper(site_name=site_name,
fail_on_missing_sub_src=True,
exclude_lint=exclude_lint,
warn_lint=warn_lint)
engine.site.collect(site_name, save_location)
@site.command('list', help='List known sites')
@click.option(
'-o',
'--output',
'output_stream',
type=click.File(mode='w'),
default=sys.stdout,
show_default=True,
help='Where to output.')
@click.option('-o', '--output', 'output_stream', help='Where to output.')
def list_sites(*, output_stream):
engine.repository.process_site_repository(update_config=True)
engine.site.list_(output_stream)
@site.command(help='Show details for one site')
@click.option(
'-o',
'--output',
'output_stream',
type=click.File(mode='w'),
default=sys.stdout,
show_default=True,
help='Where to output.')
@click.option('-o', '--output', 'output_stream', help='Where to output.')
@SITE_REPOSITORY_ARGUMENT
def show(*, output_stream, site_name):
engine.site.show(site_name, output_stream)
@site.command('render', help='Render a site through the deckhand engine')
@click.option(
'-o',
'--output',
'output_stream',
type=click.File(mode='w'),
default=sys.stdout,
show_default=True,
help='Where to output.')
@click.option('-o', '--output', 'output_stream', help='Where to output.')
@click.option(
'-v',
'--validate',
@ -322,8 +296,8 @@ def show(*, output_stream, site_name):
default=True,
show_default=True,
help='Whether to pre-validate documents using built-in schema validation. '
'Skips over externally registered DataSchema documents to avoid '
'false positives.')
'Skips over externally registered DataSchema documents to avoid '
'false positives.')
@SITE_REPOSITORY_ARGUMENT
def render(*, output_stream, site_name, validate):
engine.site.render(site_name, output_stream, validate)
@ -338,11 +312,10 @@ def lint_site(*, fail_on_missing_sub_src, exclude_lint, warn_lint, site_name):
"""Lint a given site using checks defined in
:mod:`pegleg.engine.errorcodes`.
"""
_lint_helper(
site_name=site_name,
fail_on_missing_sub_src=fail_on_missing_sub_src,
exclude_lint=exclude_lint,
warn_lint=warn_lint)
_lint_helper(site_name=site_name,
fail_on_missing_sub_src=fail_on_missing_sub_src,
exclude_lint=exclude_lint,
warn_lint=warn_lint)
def collection_default_callback(ctx, param, value):
@ -365,14 +338,13 @@ def collection_default_callback(ctx, param, value):
@click.option('--os-project-name', envvar='OS_PROJECT_NAME', required=False)
@click.option('--os-username', envvar='OS_USERNAME', required=False)
@click.option('--os-password', envvar='OS_PASSWORD', required=False)
@click.option(
'--os-auth-url', envvar='OS_AUTH_URL', required=False)
@click.option('--os-auth-url', envvar='OS_AUTH_URL', required=False)
# Option passed to Shipyard client context
@click.option(
'--context-marker',
help='Specifies a UUID (8-4-4-4-12 format) that will be used to correlate '
'logs, transactions, etc. in downstream activities triggered by this '
'interaction ',
'logs, transactions, etc. in downstream activities triggered by this '
'interaction ',
required=False,
type=click.UUID)
@click.option(
@ -384,17 +356,16 @@ def collection_default_callback(ctx, param, value):
show_default=True,
type=click.Choice(['append', 'replace']),
help='Set the buffer mode when uploading documents. Supported buffer '
'modes include append, replace, auto.\n'
'append: Add the collection to the Shipyard Buffer, only if that '
'collection does not already exist in the Shipyard buffer.\n'
'replace: Clear the Shipyard Buffer before adding the specified '
'collection.\n')
@click.option(
'--collection',
'collection',
help='Specifies the name to use for the uploaded collection. '
'Defaults to the specified `site_name`.',
callback=collection_default_callback)
'modes include append, replace, auto.\n'
'append: Add the collection to the Shipyard Buffer, only if that '
'collection does not already exist in the Shipyard buffer.\n'
'replace: Clear the Shipyard Buffer before adding the specified '
'collection.\n')
@click.option('--collection',
'collection',
help='Specifies the name to use for the uploaded collection. '
'Defaults to the specified `site_name`.',
callback=collection_default_callback)
@SITE_REPOSITORY_ARGUMENT
@click.pass_context
def upload(ctx, *, os_project_domain_name, os_user_domain_name,
@ -413,9 +384,7 @@ def upload(ctx, *, os_project_domain_name, os_user_domain_name,
'auth_url': os_auth_url
}
ctx.obj['API_PARAMETERS'] = {
'auth_vars': auth_vars
}
ctx.obj['API_PARAMETERS'] = {'auth_vars': auth_vars}
ctx.obj['context_marker'] = str(context_marker)
ctx.obj['site_name'] = site_name
ctx.obj['collection'] = collection
@ -423,9 +392,7 @@ def upload(ctx, *, os_project_domain_name, os_user_domain_name,
click.echo(ShipyardHelper(ctx, buffer_mode).upload_documents())
@site.group(
name='secrets',
help='Commands to manage site secrets documents')
@site.group(name='secrets', help='Commands to manage site secrets documents')
def secrets():
pass
@ -433,23 +400,22 @@ def secrets():
@secrets.command(
'generate-pki',
help='Generate certificates and keys according to all PKICatalog '
'documents in the site. Regenerating certificates can be '
'accomplished by re-running this command.')
'documents in the site. Regenerating certificates can be '
'accomplished by re-running this command.')
@click.option(
'-a',
'--author',
'author',
help='Identifying name of the author generating new certificates. Used'
'for tracking provenance information in the PeglegManagedDocuments. '
'An attempt is made to automatically determine this value, '
'but should be provided.')
@click.option(
'-d',
'--days',
'days',
default=365,
show_default=True,
help='Duration in days generated certificates should be valid.')
'for tracking provenance information in the PeglegManagedDocuments. '
'An attempt is made to automatically determine this value, '
'but should be provided.')
@click.option('-d',
'--days',
'days',
default=365,
show_default=True,
help='Duration in days generated certificates should be valid.')
@click.argument('site_name')
def generate_pki(site_name, author, days):
"""Generate certificates, certificate authorities and keypairs for a given
@ -457,10 +423,10 @@ def generate_pki(site_name, author, days):
"""
engine.repository.process_repositories(site_name,
overwrite_existing=True)
pkigenerator = catalog.pki_generator.PKIGenerator(
site_name, author=author, duration=days)
engine.repository.process_repositories(site_name, overwrite_existing=True)
pkigenerator = catalog.pki_generator.PKIGenerator(site_name,
author=author,
duration=days)
output_paths = pkigenerator.generate()
click.echo("Generated PKI files written to:\n%s" % '\n'.join(output_paths))
@ -469,91 +435,79 @@ def generate_pki(site_name, author, days):
@secrets.command(
'wrap',
help='Wrap bare files (e.g. pem or crt) in a PeglegManagedDocument '
'and encrypt them (by default).')
@click.option(
'-a',
'--author',
'author',
help='Author for the new wrapped file.')
@click.option(
'--filename',
'filename',
help='The relative file path for the file to be wrapped.')
'and encrypt them (by default).')
@click.option('-a',
'--author',
'author',
help='Author for the new wrapped file.')
@click.option('--filename',
'filename',
help='The relative file path for the file to be wrapped.')
@click.option(
'-o',
'--output-path',
'output_path',
required=False,
help='The output path for the wrapped file. (default: input path with '
'.yaml)')
@click.option(
'-s',
'--schema',
'schema',
help='The schema for the document to be wrapped, e.g. '
'deckhand/Certificate/v1')
@click.option(
'-n',
'--name',
'name',
help='The name for the document to be wrapped, e.g. new-cert')
@click.option(
'-l',
'--layer',
'layer',
help='The layer for the document to be wrapped., e.g. site.')
@click.option(
'--encrypt/--no-encrypt',
'encrypt',
is_flag=True,
default=True,
show_default=True,
help='Whether to encrypt the wrapped file.')
'.yaml)')
@click.option('-s',
'--schema',
'schema',
help='The schema for the document to be wrapped, e.g. '
'deckhand/Certificate/v1')
@click.option('-n',
'--name',
'name',
help='The name for the document to be wrapped, e.g. new-cert')
@click.option('-l',
'--layer',
'layer',
help='The layer for the document to be wrapped., e.g. site.')
@click.option('--encrypt/--no-encrypt',
'encrypt',
is_flag=True,
default=True,
show_default=True,
help='Whether to encrypt the wrapped file.')
@click.argument('site_name')
def wrap_secret_cli(*, site_name, author, filename, output_path, schema,
name, layer, encrypt):
def wrap_secret_cli(*, site_name, author, filename, output_path, schema, name,
layer, encrypt):
"""Wrap a bare secrets file in a YAML and ManagedDocument.
"""
engine.repository.process_repositories(site_name,
overwrite_existing=True)
wrap_secret(author, filename, output_path, schema,
name, layer, encrypt)
engine.repository.process_repositories(site_name, overwrite_existing=True)
wrap_secret(author, filename, output_path, schema, name, layer, encrypt)
@site.command(
'genesis_bundle',
help='Construct the genesis deployment bundle.')
@click.option(
'-b',
'--build-dir',
'build_dir',
type=click.Path(file_okay=False, dir_okay=True, resolve_path=True),
required=True,
help='Destination directory to store the genesis bundle.')
@site.command('genesis_bundle',
help='Construct the genesis deployment bundle.')
@click.option('-b',
'--build-dir',
'build_dir',
type=click.Path(file_okay=False,
dir_okay=True,
resolve_path=True),
required=True,
help='Destination directory to store the genesis bundle.')
@click.option(
'--include-validators',
'validators',
is_flag=True,
default=False,
help='A flag to request generate genesis validation scripts in addition '
'to genesis.sh script.')
'to genesis.sh script.')
@SITE_REPOSITORY_ARGUMENT
def genesis_bundle(*, build_dir, validators, site_name):
encryption_key = os.environ.get("PROMENADE_ENCRYPTION_KEY")
bundle.build_genesis(build_dir,
encryption_key,
validators,
logging.DEBUG == LOG.getEffectiveLevel(),
site_name
)
bundle.build_genesis(build_dir, encryption_key, validators,
logging.DEBUG == LOG.getEffectiveLevel(), site_name)
@secrets.command(
'check-pki-certs',
help='Determine if certificates in a sites PKICatalog are expired or '
'expiring within a specified number of days.')
'expiring within a specified number of days.')
@click.option(
'-d',
'--days',
@ -564,13 +518,12 @@ def genesis_bundle(*, build_dir, validators, site_name):
def check_pki_certs(site_name, days):
"""Check PKI certificates of a site for expiration."""
engine.repository.process_repositories(site_name,
overwrite_existing=True)
engine.repository.process_repositories(site_name, overwrite_existing=True)
cert_results = engine.secrets.check_cert_expiry(site_name, duration=days)
click.echo("The following certs will expire within {} days: \n{}"
.format(days, cert_results))
click.echo("The following certs will expire within {} days: \n{}".format(
days, cert_results))
@main.group(help='Commands related to types')
@ -594,30 +547,20 @@ def type(*, site_repository, clone_path, extra_repositories, repo_key,
@type.command('list', help='List known types')
@click.option(
'-o',
'--output',
'output_stream',
type=click.File(mode='w'),
default=sys.stdout,
show_default=True,
help='Where to output.')
@click.option('-o', '--output', 'output_stream', help='Where to output.')
def list_types(*, output_stream):
"""List type names for a given repository."""
engine.repository.process_site_repository(update_config=True)
engine.type.list_types(output_stream)
@secrets.group(
name='generate',
help='Command group to generate site secrets documents.')
@secrets.group(name='generate',
help='Command group to generate site secrets documents.')
def generate():
pass
@generate.command(
'passphrases',
help='Command to generate site passphrases')
@generate.command('passphrases', help='Command to generate site passphrases')
@click.argument('site_name')
@click.option(
'-s',
@ -625,24 +568,23 @@ def generate():
'save_location',
required=True,
help='Directory to store the generated site passphrases in. It will '
'be created automatically, if it does not already exist. The '
'generated, wrapped, and encrypted passphrases files will be saved '
'in: <save_location>/site/<site_name>/secrets/passphrases/ '
'directory.')
'be created automatically, if it does not already exist. The '
'generated, wrapped, and encrypted passphrases files will be saved '
'in: <save_location>/site/<site_name>/secrets/passphrases/ '
'directory.')
@click.option(
'-a',
'--author',
'author',
required=True,
help='Identifier for the program or person who is generating the secrets '
'documents')
@click.option(
'-i',
'--interactive',
'interactive',
is_flag=True,
default=False,
help='Generate passphrases interactively, not automatically')
'documents')
@click.option('-i',
'--interactive',
'interactive',
is_flag=True,
default=False,
help='Generate passphrases interactively, not automatically')
@click.option(
'--force-cleartext',
'force_cleartext',
@ -653,31 +595,30 @@ def generate():
def generate_passphrases(*, site_name, save_location, author, interactive,
force_cleartext):
engine.repository.process_repositories(site_name)
engine.secrets.generate_passphrases(
site_name, save_location, author, interactive, force_cleartext)
engine.secrets.generate_passphrases(site_name, save_location, author,
interactive, force_cleartext)
@secrets.command(
'encrypt',
help='Command to encrypt and wrap site secrets '
'documents with metadata.storagePolicy set '
'to encrypted, in pegleg managed documents.')
@secrets.command('encrypt',
help='Command to encrypt and wrap site secrets '
'documents with metadata.storagePolicy set '
'to encrypted, in pegleg managed documents.')
@click.option(
'-s',
'--save-location',
'save_location',
default=None,
help='Directory to output the encrypted site secrets files. Created '
'automatically if it does not already exist. '
'If save_location is not provided, the output encrypted files will '
'overwrite the original input files (default behavior)')
'automatically if it does not already exist. '
'If save_location is not provided, the output encrypted files will '
'overwrite the original input files (default behavior)')
@click.option(
'-a',
'--author',
'author',
required=True,
help='Identifier for the program or person who is encrypting the secrets '
'documents')
'documents')
@click.argument('site_name')
def encrypt(*, save_location, author, site_name):
engine.repository.process_repositories(site_name, overwrite_existing=True)
@ -686,23 +627,21 @@ def encrypt(*, save_location, author, site_name):
engine.secrets.encrypt(save_location, author, site_name)
@secrets.command(
'decrypt',
help='Command to unwrap and decrypt one site '
'secrets document and print it to stdout.')
@click.option(
'--path',
'path',
type=click.Path(exists=True, readable=True),
required=True,
help='The file or directory path to decrypt.')
@secrets.command('decrypt',
help='Command to unwrap and decrypt one site '
'secrets document and print it to stdout.')
@click.option('--path',
'path',
type=click.Path(exists=True, readable=True),
required=True,
help='The file or directory path to decrypt.')
@click.option(
'-s',
'--save-location',
'save_location',
default=None,
help='The destination where the decrypted file(s) should be saved. '
'If not specified, decrypted data will output to stdout.')
'If not specified, decrypted data will output to stdout.')
@click.option(
'-o',
'--overwrite',
@ -710,25 +649,23 @@ def encrypt(*, save_location, author, site_name):
is_flag=True,
default=False,
help='Overwrites original file(s) at path with decrypted data when set. '
'Overrides --save-location option.')
'Overrides --save-location option.')
@click.argument('site_name')
def decrypt(*, path, save_location, overwrite, site_name):
engine.repository.process_repositories(site_name)
decrypted = engine.secrets.decrypt(path)
if overwrite:
for key, value in decrypted.items():
files.write(key, value)
os.chmod(key, 0o600)
for path, data in decrypted.items():
files.write(path, data)
elif save_location is None:
for value in decrypted.values():
click.echo(value)
for data in decrypted.values():
click.echo(data)
else:
for key, value in decrypted.items():
file_name = os.path.split(key)[1]
for path, data in decrypted.items():
file_name = os.path.split(path)[1]
file_save_location = os.path.join(save_location, file_name)
files.write(file_save_location, value)
os.chmod(file_save_location, 0o600)
files.write(data, file_save_location)
@main.group(help='Miscellaneous generate commands')
@ -739,30 +676,27 @@ def generate():
@generate.command(
'passphrase',
help='Command to generate a passphrase and print out to stdout')
@click.option(
'-l',
'--length',
'length',
default=24,
show_default=True,
help='Generate a passphrase of the given length. '
'Length is >= 24, no maximum length.')
@click.option('-l',
'--length',
'length',
default=24,
show_default=True,
help='Generate a passphrase of the given length. '
'Length is >= 24, no maximum length.')
def generate_passphrase(length):
click.echo('Generated Passhprase: {}'.format(
engine.secrets.generate_crypto_string(length)))
@generate.command(
'salt',
help='Command to generate a salt and print out to stdout')
@click.option(
'-l',
'--length',
'length',
default=24,
show_default=True,
help='Generate a passphrase of the given length. '
'Length is >= 24, no maximum length.')
@generate.command('salt',
help='Command to generate a salt and print out to stdout')
@click.option('-l',
'--length',
'length',
default=24,
show_default=True,
help='Generate a passphrase of the given length. '
'Length is >= 24, no maximum length.')
def generate_salt(length):
click.echo("Generated Salt: {}".format(
engine.secrets.generate_crypto_string(length)))

View File

@ -34,10 +34,16 @@ except NameError:
'passphrase': None,
'salt': None,
'salt_min_length': 24,
'passphrase_min_length': 24
'passphrase_min_length': 24,
'default_umask': 0o027
}
def set_umask():
"""Set the umask for Pegleg to use when creating files/folders."""
os.umask(GLOBAL_CONTEXT['default_umask'])
def get_site_repo():
"""Get the primary site repository specified via ``-r`` CLI flag."""
return GLOBAL_CONTEXT['site_repo']

View File

@ -14,7 +14,6 @@
import logging
import os
import stat
import click
@ -64,8 +63,6 @@ def build_genesis(build_path, encryption_key, validators, debug, site_name):
# Copy the site config, and site secrets to build directory
os.mkdir(build_path)
os.chmod(build_path, os.stat(build_path).st_mode | stat.S_IRWXU |
stat.S_IRWXG | stat.S_IROTH | stat.S_IXOTH)
documents = util.definition.documents_for_site(site_name)
secret_manager = PeglegSecretManagement(docs=documents)
documents = secret_manager.get_decrypted_secrets()

View File

@ -17,8 +17,6 @@ import itertools
import logging
import os
import yaml
from pegleg import config
from pegleg.engine.catalog import pki_utility
from pegleg.engine.common import managed_document as md
@ -71,8 +69,8 @@ class PKIGenerator(object):
self._cert_to_ca_map = {}
def generate(self):
for catalog in util.catalog.iterate(
documents=self._documents, kind='PKICatalog'):
for catalog in util.catalog.iterate(documents=self._documents,
kind='PKICatalog'):
for ca_name, ca_def in catalog['data'].get(
'certificate_authorities', {}).items():
ca_cert, ca_key = self.get_or_gen_ca(ca_name)
@ -121,8 +119,10 @@ class PKIGenerator(object):
def gen_cert(self, document_name, *, ca_cert, ca_key, **kwargs):
ca_cert_data = ca_cert['data']['managedDocument']['data']
ca_key_data = ca_key['data']['managedDocument']['data']
return self.keys.generate_certificate(
document_name, ca_cert=ca_cert_data, ca_key=ca_key_data, **kwargs)
return self.keys.generate_certificate(document_name,
ca_cert=ca_cert_data,
ca_key=ca_key_data,
**kwargs)
def gen_keypair(self, document_name):
return self.keys.generate_keypair(document_name)
@ -132,8 +132,7 @@ class PKIGenerator(object):
if not docs:
docs = generator(document_name, *args, **kwargs)
else:
docs = PeglegSecretManagement(
docs=docs)
docs = PeglegSecretManagement(docs=docs)
# Adding these to output should be idempotent, so we use a dict.
@ -154,8 +153,8 @@ class PKIGenerator(object):
document_name, kinds)
return docs
else:
raise exceptions.IncompletePKIPairError(
kinds=kinds, name=document_name)
raise exceptions.IncompletePKIPairError(kinds=kinds,
name=document_name)
else:
docs = self._find_among_outputs(schemas, document_name)
@ -171,8 +170,9 @@ class PKIGenerator(object):
def _find_among_collected(self, schemas, document_name):
result = []
for schema in schemas:
doc = _find_document_by(
self._documents, schema=schema, name=document_name)
doc = _find_document_by(self._documents,
schema=schema,
name=document_name)
# If the document wasn't found, then means it needs to be
# generated.
if doc:
@ -221,26 +221,23 @@ class PKIGenerator(object):
# Encrypt the document
document['data']['managedDocument']['metadata']['storagePolicy']\
= 'encrypted'
document = PeglegSecretManagement(docs=[
document]).get_encrypted_secrets()[0][0]
document = PeglegSecretManagement(
docs=[document]).get_encrypted_secrets()[0][0]
with open(output_path, 'a') as f:
# Don't use safe_dump so we can block format certificate
# data.
yaml.dump(
document,
stream=f,
default_flow_style=False,
explicit_start=True,
indent=2)
util.files.dump(document,
output_path,
flag='a',
default_flow_style=False,
explicit_start=True,
indent=2)
output_paths.add(output_path)
return output_paths
def get_documents(self):
return list(
itertools.chain.from_iterable(
v.values() for v in self.outputs.values()))
itertools.chain.from_iterable(v.values()
for v in self.outputs.values()))
def get_host_list(service_names):
@ -288,9 +285,10 @@ def _matches_filter(document, *, schema, labels, name):
document_metadata = document['metadata']
document_labels = document_metadata.get('labels', {})
document_name = document_metadata['name']
LOG.warning('Detected deprecated unmanaged document during PKI '
'generation. Details: schema=%s, name=%s, labels=%s.',
document_schema, document_labels, document_name)
LOG.warning(
'Detected deprecated unmanaged document during PKI '
'generation. Details: schema=%s, name=%s, labels=%s.',
document_schema, document_labels, document_name)
if schema is not None and not document.get('schema',
'').startswith(schema):

View File

@ -26,6 +26,7 @@ import pytz
import yaml
from pegleg.engine import exceptions
from pegleg.engine import util
from pegleg.engine.util.catalog import decode_bytes
from pegleg.engine.util.pegleg_managed_document import \
PeglegManagedSecretsDocument
@ -34,7 +35,6 @@ LOG = logging.getLogger(__name__)
__all__ = ['PKIUtility']
# TODO(felipemonteiro): Create an abstract base class for other future Catalog
# classes.
@ -73,10 +73,11 @@ class PKIUtility(object):
'signing': {
'default': {
'expiry':
str(24 * self.duration) + 'h',
str(24 * self.duration) + 'h',
'usages': [
'signing', 'key encipherment', 'server auth',
'client auth'],
'client auth'
],
},
},
})
@ -91,11 +92,8 @@ class PKIUtility(object):
"""
result = self._cfssl(
['gencert', '-initca', 'csr.json'],
files={
'csr.json': self.csr(name=ca_name),
})
result = self._cfssl(['gencert', '-initca', 'csr.json'],
files={'csr.json': self.csr(name=ca_name)})
return (self._wrap_ca(ca_name, result['cert']),
self._wrap_ca_key(ca_name, result['key']))
@ -192,10 +190,8 @@ class PKIUtility(object):
"""
return self._cfssl(
['certinfo', '-cert', 'cert.pem'], files={
'cert.pem': cert,
})
return self._cfssl(['certinfo', '-cert', 'cert.pem'],
files={'cert.pem': cert})
def check_expiry(self, cert):
"""Chek whether a given certificate is expired.
@ -227,8 +223,8 @@ class PKIUtility(object):
files = {}
with tempfile.TemporaryDirectory() as tmp:
for filename, data in files.items():
with open(os.path.join(tmp, filename), 'w') as f:
f.write(decode_bytes(data))
util.files.write(decode_bytes(data),
os.path.join(tmp, filename))
# Ignore bandit false positive:
# B603:subprocess_without_shell_equals_true
@ -245,8 +241,8 @@ class PKIUtility(object):
with tempfile.TemporaryDirectory() as tmp:
for filename, data in files.items():
with open(os.path.join(tmp, filename), 'w') as f:
f.write(decode_bytes(data))
util.files.write(decode_bytes(data),
os.path.join(tmp, filename))
# Ignore bandit false positive:
# B603:subprocess_without_shell_equals_true
@ -259,33 +255,45 @@ class PKIUtility(object):
result = {}
for filename in os.listdir(tmp):
if filename not in files:
with open(os.path.join(tmp, filename)) as f:
with open(os.path.join(tmp, filename), 'r') as f:
result[filename] = f.read()
return result
def _wrap_ca(self, name, data):
return self.wrap_document(kind='CertificateAuthority', name=name,
data=data, block_strings=self.block_strings)
return self.wrap_document(kind='CertificateAuthority',
name=name,
data=data,
block_strings=self.block_strings)
def _wrap_ca_key(self, name, data):
return self.wrap_document(kind='CertificateAuthorityKey', name=name,
data=data, block_strings=self.block_strings)
return self.wrap_document(kind='CertificateAuthorityKey',
name=name,
data=data,
block_strings=self.block_strings)
def _wrap_cert(self, name, data):
return self.wrap_document(kind='Certificate', name=name, data=data,
return self.wrap_document(kind='Certificate',
name=name,
data=data,
block_strings=self.block_strings)
def _wrap_cert_key(self, name, data):
return self.wrap_document(kind='CertificateKey', name=name, data=data,
return self.wrap_document(kind='CertificateKey',
name=name,
data=data,
block_strings=self.block_strings)
def _wrap_priv_key(self, name, data):
return self.wrap_document(kind='PrivateKey', name=name, data=data,
return self.wrap_document(kind='PrivateKey',
name=name,
data=data,
block_strings=self.block_strings)
def _wrap_pub_key(self, name, data):
return self.wrap_document(kind='PublicKey', name=name, data=data,
return self.wrap_document(kind='PublicKey',
name=name,
data=data,
block_strings=self.block_strings)
@staticmethod
@ -311,8 +319,8 @@ class PKIUtility(object):
},
'storagePolicy': 'cleartext'
}
wrapped_data = PKIUtility._block_literal(
data, block_strings=block_strings)
wrapped_data = PKIUtility._block_literal(data,
block_strings=block_strings)
document = {
"schema": wrapped_schema,

View File

@ -46,10 +46,10 @@ class PassphraseGenerator(BaseGenerator):
certificates.
"""
super(PassphraseGenerator, self).__init__(
sitename, save_location, author)
self._catalog = PassphraseCatalog(
self._sitename, documents=self._documents)
super(PassphraseGenerator, self).__init__(sitename, save_location,
author)
self._catalog = PassphraseCatalog(self._sitename,
documents=self._documents)
self._pass_util = CryptoString()
def generate(self, interactive=False, force_cleartext=False):
@ -69,7 +69,7 @@ class PassphraseGenerator(BaseGenerator):
if interactive:
passphrase = getpass(
prompt="Input passphrase for {}. Leave blank to "
"auto-generate:\n".format(p_name))
"auto-generate:\n".format(p_name))
if not passphrase:
passphrase = self._pass_util.get_crypto_string(
self._catalog.get_length(p_name))
@ -86,19 +86,17 @@ class PassphraseGenerator(BaseGenerator):
else:
storage_policy = self._catalog.get_storage_policy(p_name)
docs.append(self.generate_doc(
KIND,
p_name,
storage_policy,
passphrase))
docs.append(
self.generate_doc(KIND, p_name, storage_policy, passphrase))
save_path = self.get_save_path(p_name)
if storage_policy == passphrase_catalog.P_ENCRYPTED:
PeglegSecretManagement(
docs=docs, generated=True, author=self._author,
catalog=self._catalog).encrypt_secrets(
save_path)
docs=docs,
generated=True,
author=self._author,
catalog=self._catalog).encrypt_secrets(save_path)
else:
files.write(save_path, docs)
files.write(docs, save_path)
@property
def kind_path(self):

View File

@ -217,7 +217,7 @@ def _verify_file_contents(*, sitename=None):
def _verify_single_file(filename, schemas):
errors = []
LOG.debug("Validating file %s.", filename)
with open(filename) as f:
with open(filename, 'r') as f:
if not f.read(4) == '---\n':
errors.append((FILE_MISSING_YAML_DOCUMENT_HEADER,
'%s does not begin with YAML beginning of document '

View File

@ -59,9 +59,10 @@ def encrypt(save_location, author, site_name):
secrets_found = False
for repo_base, file_path in definition.site_files_by_repo(site_name):
secrets_found = True
PeglegSecretManagement(
file_path=file_path, author=author).encrypt_secrets(
_get_dest_path(repo_base, file_path, save_location))
PeglegSecretManagement(file_path=file_path,
author=author).encrypt_secrets(
_get_dest_path(repo_base, file_path,
save_location))
if secrets_found:
LOG.info('Encryption of all secret files was completed.')
else:
@ -97,8 +98,8 @@ def decrypt(path):
match = os.path.join(path, '**', '*.yaml')
file_list = glob(match, recursive=True)
if not file_list:
LOG.warning('No YAML files were discovered in path: {}'
.format(path))
LOG.warning(
'No YAML files were discovered in path: {}'.format(path))
for file_path in file_list:
file_dict[file_path] = PeglegSecretManagement(
file_path).decrypt_secrets()
@ -131,7 +132,10 @@ def _get_dest_path(repo_base, file_path, save_location):
return file_path
def generate_passphrases(site_name, save_location, author, interactive=False,
def generate_passphrases(site_name,
save_location,
author,
interactive=False,
force_cleartext=False):
"""
Look for the site passphrase catalogs, and for every passphrase entry in
@ -146,9 +150,9 @@ def generate_passphrases(site_name, save_location, author, interactive=False,
:param bool force_cleartext: Whether to generate results in clear text
"""
PassphraseGenerator(
site_name, save_location, author).generate(
interactive=interactive, force_cleartext=force_cleartext)
PassphraseGenerator(site_name, save_location,
author).generate(interactive=interactive,
force_cleartext=force_cleartext)
def generate_crypto_string(length):
@ -162,8 +166,7 @@ def generate_crypto_string(length):
return CryptoString().get_crypto_string(length)
def wrap_secret(author, filename, output_path, schema,
name, layer, encrypt):
def wrap_secret(author, filename, output_path, schema, name, layer, encrypt):
"""Wrap a bare secrets file in a YAML and ManagedDocument.
:param author: author for ManagedDocument
@ -178,7 +181,7 @@ def wrap_secret(author, filename, output_path, schema,
if not output_path:
output_path = os.path.splitext(filename)[0] + ".yaml"
with open(filename, "r") as in_fi:
with open(filename, 'r') as in_fi:
data = in_fi.read()
inner_doc = {
@ -200,8 +203,7 @@ def wrap_secret(author, filename, output_path, schema,
output_doc = psm.get_encrypted_secrets()[0][0]
else:
output_doc = managed_secret.pegleg_document
with open(output_path, "w") as output_fi:
yaml.safe_dump(output_doc, output_fi)
files.safe_dump(output_doc, output_path)
def check_cert_expiry(site_name, duration=60):

View File

@ -32,7 +32,7 @@ LOG = logging.getLogger(__name__)
def _read_and_format_yaml(filename):
with open(filename) as f:
with open(filename, 'r') as f:
lines_to_write = f.readlines()
if lines_to_write[0] != '---\n':
lines_to_write = ['---\n'] + lines_to_write
@ -78,12 +78,14 @@ def _collect_to_file(site_name, save_location):
repo_name = os.path.normpath(repo_base).split(os.sep)[-1]
save_file = os.path.join(save_location, repo_name + '.yaml')
if repo_name not in save_files:
save_files[repo_name] = open(save_file, "w")
save_files[repo_name] = open(save_file, 'w')
LOG.debug("Collecting file %s to file %s", filename, save_file)
save_files[repo_name].writelines(_read_and_format_yaml(filename))
save_files[curr_site_repo].writelines(yaml.safe_dump(
_get_deployment_data_doc(), default_flow_style=False,
explicit_start=True, explicit_end=True))
save_files[curr_site_repo].writelines(
yaml.safe_dump(_get_deployment_data_doc(),
default_flow_style=False,
explicit_start=True,
explicit_end=True))
except Exception as ex:
raise click.ClickException("Error saving output: %s" % str(ex))
finally:
@ -104,7 +106,7 @@ def render(site_name, output_stream, validate):
SafeConstructor.add_multi_constructor(
'', lambda loader, suffix, node: None)
for filename in util.definition.site_files(site_name):
with open(filename) as f:
with open(filename, 'r') as f:
documents.extend(list(yaml.safe_load_all(f)))
rendered_documents, errors = util.deckhand.deckhand_render(
@ -117,12 +119,19 @@ def render(site_name, output_stream, validate):
else:
err_msg += str(err) + '\n'
raise click.ClickException(err_msg)
yaml.dump_all(
rendered_documents,
output_stream,
default_flow_style=False,
explicit_start=True,
explicit_end=True)
if output_stream:
files.dump_all(rendered_documents,
output_stream,
default_flow_style=False,
explicit_start=True,
explicit_end=True)
else:
yaml.dump_all(rendered_documents,
output_stream,
default_flow_style=False,
explicit_start=True,
explicit_end=True)
def list_(output_stream):
@ -137,7 +146,11 @@ def list_(output_stream):
params = util.definition.load_as_params(site_name, *field_names)
site_table.add_row(list(map(lambda k: params[k], field_names)))
# Write table to specified output_stream
output_stream.write(site_table.get_string() + "\n")
msg = site_table.get_string()
if output_stream:
files.write(msg + "\n", output_stream)
else:
click.echo(msg)
def show(site_name, output_stream):
@ -157,12 +170,18 @@ def show(site_name, output_stream):
site_table.add_row(
["", data['site_name'], data['site_type'], file])
# Write tables to specified output_stream
output_stream.write(site_table.get_string() + "\n")
msg = site_table.get_string()
if output_stream:
files.write(msg + "\n", output_stream)
else:
click.echo(msg)
def _get_deployment_data_doc():
stanzas = {files.path_leaf(repo): _get_repo_deployment_data_stanza(repo)
for repo in config.all_repos()}
stanzas = {
files.path_leaf(repo): _get_repo_deployment_data_stanza(repo)
for repo in config.all_repos()
}
return {
"schema": "pegleg/DeploymentData/v1",
"metadata": {
@ -186,8 +205,7 @@ def _get_repo_deployment_data_stanza(repo_path):
commit = repo.commit()
# If we're at a particular tag, reference it
tag = [tag.name for tag in
repo.tags if tag.commit == commit]
tag = [tag.name for tag in repo.tags if tag.commit == commit]
if tag:
tag == ", ".join(tag)
else:
@ -199,14 +217,6 @@ def _get_repo_deployment_data_stanza(repo_path):
tag = "Detached HEAD"
else:
raise e
return {
"commit": commit.hexsha,
"tag": tag,
"dirty": repo.is_dirty()
}
return {"commit": commit.hexsha, "tag": tag, "dirty": repo.is_dirty()}
except git.InvalidGitRepositoryError:
return {
"commit": "None",
"tag": "None",
"dirty": "None"
}
return {"commit": "None", "tag": "None", "dirty": "None"}

View File

@ -14,9 +14,11 @@
import logging
import click
from prettytable import PrettyTable
from pegleg.engine import util
from pegleg.engine.util import files
__all__ = ('list_types', )
@ -32,4 +34,8 @@ def list_types(output_stream):
for type_name in util.files.list_types():
type_table.add_row([type_name])
# Write table to specified output_stream
output_stream.write(type_table.get_string() + "\n")
msg = type_table.get_string()
if output_stream:
files.write(msg + "\n", output_stream)
else:
click.echo(msg)

View File

@ -34,6 +34,8 @@ __all__ = [
'directories_for',
'directory_for',
'dump',
'safe_dump',
'dump_all',
'read',
'write',
'existing_directories',
@ -116,7 +118,7 @@ FULL_STRUCTURE = {
def _create_tree(root_path, *, tree=FULL_STRUCTURE):
for name, data in tree.get('directories', {}).items():
path = os.path.join(root_path, name)
os.makedirs(path, mode=0o775, exist_ok=True)
os.makedirs(path, exist_ok=True)
_create_tree(path, tree=data)
for filename, yaml_data in tree.get('files', {}).items():
@ -226,7 +228,7 @@ def slurp(path):
'%s not found. Pegleg must be run from the root of a configuration'
' repository.' % path)
with open(path) as f:
with open(path, 'r') as f:
try:
# Ignore YAML tags, only construct dicts
SafeConstructor.add_multi_constructor(
@ -236,14 +238,34 @@ def slurp(path):
raise click.ClickException('Failed to parse %s:\n%s' % (path, e))
def dump(path, data):
if os.path.exists(path):
def dump(data, path, flag='w', **kwargs):
if flag == 'w' and os.path.exists(path):
raise click.ClickException('%s already exists, aborting' % path)
os.makedirs(os.path.dirname(path), mode=0o775, exist_ok=True)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, flag) as f:
with open(path, 'w') as f:
yaml.dump(data, f, explicit_start=True)
yaml.dump(data, f, **kwargs)
def safe_dump(data, path, flag='w', **kwargs):
if flag == 'w' and os.path.exists(path):
raise click.ClickException('%s already exists, aborting' % path)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, flag) as f:
yaml.safe_dump(data, f, **kwargs)
def dump_all(data, path, flag='w', **kwargs):
if flag == 'w' and os.path.exists(path):
raise click.ClickException('%s already exists, aborting' % path)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, flag) as f:
yaml.dump_all(data, f, **kwargs)
def read(path):
@ -272,29 +294,29 @@ def read(path):
if any(schema.startswith(x) for x in valid_schemas):
return True
else:
LOG.debug('Document with schema=%s is not a valid Deckhand '
'schema. Ignoring it.', schema)
LOG.debug(
'Document with schema=%s is not a valid Deckhand '
'schema. Ignoring it.', schema)
return False
def is_pegleg_managed_document(document):
return md.PeglegManagedSecretsDocument.is_pegleg_managed_secret(
document)
with open(path) as stream:
with open(path, 'r') as stream:
# Ignore YAML tags, only construct dicts
SafeConstructor.add_multi_constructor(
'', lambda loader, suffix, node: None)
try:
return [
d for d in yaml.safe_load_all(stream)
if d and (is_deckhand_document(d) or
is_pegleg_managed_document(d))
d for d in yaml.safe_load_all(stream) if d and
(is_deckhand_document(d) or is_pegleg_managed_document(d))
]
except yaml.YAMLError as e:
raise click.ClickException('Failed to parse %s:\n%s' % (path, e))
def write(file_path, data):
def write(data, file_path):
"""
Write the data to destination file_path.
@ -306,28 +328,25 @@ def write(file_path, data):
:param data: data to be written to the destination file
:type data: str, dict, or a list of dicts
"""
try:
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'w') as stream:
if isinstance(data, str):
stream.write(data)
elif isinstance(data, (dict, collections.abc.Iterable)):
if isinstance(data, dict):
data = [data]
yaml.safe_dump_all(
data,
stream,
explicit_start=True,
explicit_end=True,
default_flow_style=False)
yaml.safe_dump_all(data,
stream,
explicit_start=True,
explicit_end=True,
default_flow_style=False)
else:
raise ValueError('data must be str or dict, '
'not {}'.format(type(data)))
except EnvironmentError as e:
raise click.ClickError(
"Couldn't write data to {}: {}".format(file_path, e))
raise click.ClickError("Couldn't write data to {}: {}".format(
file_path, e))
def _recurse_subdirs(search_path, depth):
@ -339,8 +358,8 @@ def _recurse_subdirs(search_path, depth):
if depth == 1:
directories.add(joined_path)
else:
directories.update(
_recurse_subdirs(joined_path, depth - 1))
directories.update(_recurse_subdirs(
joined_path, depth - 1))
except FileNotFoundError:
pass
return directories
@ -382,8 +401,9 @@ def check_file_save_location(save_location):
if save_location:
if not os.path.exists(save_location):
LOG.debug("Save location %s does not exist. Creating "
"automatically.", save_location)
LOG.debug(
"Save location %s does not exist. Creating "
"automatically.", save_location)
os.makedirs(save_location)
# In case save_location already exists and isn't a directory.
if not os.path.isdir(save_location):
@ -396,8 +416,7 @@ def collect_files_by_repo(site_name):
"""Collects file by repo name in memory."""
collected_files_by_repo = collections.defaultdict(list)
for repo_base, filename in util.definition.site_files_by_repo(
site_name):
for repo_base, filename in util.definition.site_files_by_repo(site_name):
repo_name = os.path.normpath(repo_base).split(os.sep)[-1]
documents = util.files.read(filename)
collected_files_by_repo[repo_name].extend(documents)
@ -411,8 +430,7 @@ def file_in_subdir(filename, _dir):
:return: Whether _dir is a parent of the file
:rtype: bool
"""
file_path, filename = os.path.split(
os.path.realpath(filename))
file_path, filename = os.path.split(os.path.realpath(filename))
return _dir in file_path.split(os.path.sep)
@ -425,8 +443,7 @@ def path_leaf(path):
:return: the last non-empty element of a string
:rtype: str
"""
split_path = [i for i in path.split(os.sep)
if i]
split_path = [i for i in path.split(os.sep) if i]
if split_path:
return split_path[-1]
else:

View File

@ -100,7 +100,7 @@ class PeglegSecretManagement(object):
doc_list, encrypted_docs = self.get_encrypted_secrets()
if encrypted_docs:
files.write(save_path, doc_list)
files.write(doc_list, save_path)
click.echo('Wrote encrypted data to: {}'.format(save_path))
else:
LOG.debug('All documents in file: {} are either already encrypted '

View File

@ -87,10 +87,11 @@ data: ABAgagajajkb839215387
"""
@mock.patch.dict(os.environ, {
'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
@mock.patch.dict(
os.environ, {
'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_no_encryption_key(temp_path):
# Write the test data to temp file
config_data = list(yaml.safe_load_all(SITE_CONFIG_DATA))
@ -102,9 +103,9 @@ def test_no_encryption_key(temp_path):
build_dir = os.path.join(temp_path, 'build_dir')
os.makedirs(config_dir)
files.write(config_path, config_data)
files.write(os.path.join(config_dir, "site-definition.yaml"),
yaml.safe_load_all(SITE_DEFINITION))
files.write(config_data, config_path)
files.write(yaml.safe_load_all(SITE_DEFINITION),
os.path.join(config_dir, "site-definition.yaml"))
with pytest.raises(GenesisBundleEncryptionException,
match=r'.*no encryption policy or key is specified.*'):
@ -115,10 +116,11 @@ def test_no_encryption_key(temp_path):
site_name="test_site")
@mock.patch.dict(os.environ, {
'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
@mock.patch.dict(
os.environ, {
'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC',
'PEGLEG_SALT': 'MySecretSalt1234567890]['
})
def test_failed_deckhand_validation(temp_path):
# Write the test data to temp file
config_data = list(yaml.safe_load_all(SITE_CONFIG_DATA))
@ -129,9 +131,9 @@ def test_failed_deckhand_validation(temp_path):
config_path = os.path.join(config_dir, 'config_file.yaml')
build_dir = os.path.join(temp_path, 'build_dir')
os.makedirs(config_dir)
files.write(config_path, config_data)
files.write(os.path.join(config_dir, "site-definition.yaml"),
yaml.safe_load_all(SITE_DEFINITION))
files.write(config_data, config_path)
files.write(yaml.safe_load_all(SITE_DEFINITION),
os.path.join(config_dir, "site-definition.yaml"))
key = 'MyverYSecretEncryptionKey382803'
with pytest.raises(GenesisBundleGenerateException,
match=r'.*failed on deckhand validation.*'):

View File

@ -201,7 +201,7 @@ def test_encrypt_decrypt_using_file_path(temp_path):
# write the test data to temp file
test_data = list(yaml.safe_load_all(TEST_DATA))
file_path = os.path.join(temp_path, 'secrets_file.yaml')
files.write(file_path, test_data)
files.write(test_data, file_path)
save_path = os.path.join(temp_path, 'encrypted_secrets_file.yaml')
# encrypt documents and validate that they were encrypted

View File

@ -22,6 +22,9 @@ from pegleg.engine.util import files
from tests.unit.fixtures import create_tmp_deployment_files
from tests.unit.fixtures import temp_path
EXPECTED_FILE_PERM = '0o640'
EXPECTED_DIR_PERM = '0o750'
class TestFileHelpers(object):
def test_read_compatible_file(self, create_tmp_deployment_files):
@ -44,20 +47,31 @@ class TestFileHelpers(object):
def test_write(self, create_tmp_deployment_files):
path = os.path.join(config.get_site_repo(), 'site', 'cicd',
'test_out.yaml')
files.write(path, "test text")
files.write("test text", path)
with open(path, "r") as out_fi:
assert out_fi.read() == "test text"
files.write(path, {"a": 1})
files.write({"a": 1}, path)
with open(path, "r") as out_fi:
assert yaml.safe_load(out_fi) == {"a": 1}
files.write(path, [{"a": 1}])
files.write([{"a": 1}], path)
with open(path, "r") as out_fi:
assert list(yaml.safe_load_all(out_fi)) == [{"a": 1}]
with pytest.raises(ValueError) as _:
files.write(path, object())
files.write(object(), path)
def test_file_permissions(self, create_tmp_deployment_files):
path = os.path.join(config.get_site_repo(), 'site', 'cicd',
'test_out.yaml')
files.write("test text", path)
assert oct(os.stat(path).st_mode & 0o777) == EXPECTED_FILE_PERM
def test_dir_permissions(self, create_tmp_deployment_files):
path = os.path.join(config.get_site_repo(), 'site', 'cicd', 'test_dir')
os.makedirs(path)
assert oct(os.stat(path).st_mode & 0o777) == EXPECTED_DIR_PERM
def test_file_in_subdir():

View File

@ -45,9 +45,9 @@ DEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF
-----END CERTIFICATE-----
"""
@pytest.mark.skipif(
not test_utils.is_connected(),
reason='git clone requires network connectivity.')
@pytest.mark.skipif(not test_utils.is_connected(),
reason='git clone requires network connectivity.')
class BaseCLIActionTest(object):
"""Tests end-to-end flows for all Pegleg CLI actions, with minimal mocking.
@ -94,7 +94,7 @@ class TestSiteCLIOptions(BaseCLIActionTest):
# location if `clone_path` is set)
repo_url = 'https://opendev.org/airship/%s@%s' % (self.repo_name,
self.repo_rev)
self.repo_rev)
# Note that the -p option is used to specify the clone_folder
site_list = self.runner.invoke(
@ -144,7 +144,7 @@ class TestSiteCLIOptionsNegative(BaseCLIActionTest):
# location if `clone_path` is set)
repo_url = 'https://opendev.org/airship/%s@%s' % (self.repo_name,
self.repo_rev)
self.repo_rev)
# Note that the -p option is used to specify the clone_folder
site_list = self.runner.invoke(
@ -192,7 +192,7 @@ class TestSiteCliActions(BaseCLIActionTest):
# 3) Check that expected file name is there
repo_url = 'https://opendev.org/airship/%s@%s' % (self.repo_name,
self.repo_rev)
self.repo_rev)
self._validate_collect_site_action(repo_url, temp_path)
def test_collect_using_remote_repo_url_ending_with_dot_git(
@ -205,7 +205,7 @@ class TestSiteCliActions(BaseCLIActionTest):
# 3) Check that expected file name is there
repo_url = 'https://opendev.org/airship/%s@%s.git' % (self.repo_name,
self.repo_rev)
self.repo_rev)
self._validate_collect_site_action(repo_url, temp_path)
def test_collect_using_local_path(self, temp_path):
@ -252,7 +252,7 @@ class TestSiteCliActions(BaseCLIActionTest):
# 2) Lint site with exclude flags (should clone repo automatically)
repo_url = 'https://opendev.org/airship/%s@%s' % (self.repo_name,
self.repo_rev)
self.repo_rev)
self._test_lint_site_action(repo_url, exclude=True)
def test_lint_site_using_local_path_with_exclude(self):
@ -277,66 +277,68 @@ class TestSiteCliActions(BaseCLIActionTest):
### List tests ###
def _validate_list_site_action(self, repo_path_or_url):
mock_output = mock.Mock()
def _validate_list_site_action(self, repo_path_or_url, temp_path):
mock_output = os.path.join(temp_path, 'output')
result = self.runner.invoke(
cli.site, ['-r', repo_path_or_url, 'list', '-o', mock_output])
assert result.exit_code == 0, result.output
table_output = mock_output.write.mock_calls[0][1][0]
with open(mock_output, 'r') as f:
table_output = f.read()
assert self.site_name in table_output
assert self.site_type in table_output
def test_list_sites_using_remote_repo_url(self):
def test_list_sites_using_remote_repo_url(self, temp_path):
"""Validates list action using remote repo URL."""
# Scenario:
#
# 1) List sites (should clone repo automatically)
repo_url = 'https://opendev.org/airship/%s@%s' % (self.repo_name,
self.repo_rev)
self.repo_rev)
self._validate_list_site_action(repo_url)
self._validate_list_site_action(repo_url, temp_path)
def test_list_sites_using_local_path(self):
def test_list_sites_using_local_path(self, temp_path):
"""Validates list action using local repo path."""
# Scenario:
#
# 1) List sites (should skip clone repo)
repo_path = self.treasuremap_path
self._validate_list_site_action(repo_path)
self._validate_list_site_action(repo_path, temp_path)
### Show tests ###
def _validate_site_show_action(self, repo_path_or_url):
mock_output = mock.Mock()
def _validate_site_show_action(self, repo_path_or_url, temp_path):
mock_output = os.path.join(temp_path, 'output')
result = self.runner.invoke(cli.site, [
'-r', repo_path_or_url, 'show', self.site_name, '-o', mock_output
])
assert result.exit_code == 0, result.output
table_output = mock_output.write.mock_calls[0][1][0]
with open(mock_output, 'r') as f:
table_output = f.read()
assert self.site_name in table_output
def test_show_site_using_remote_repo_url(self):
def test_show_site_using_remote_repo_url(self, temp_path):
"""Validates show action using remote repo URL."""
# Scenario:
#
# 1) Show site (should clone repo automatically)
repo_url = 'https://opendev.org/airship/%s@%s' % (self.repo_name,
self.repo_rev)
self._validate_site_show_action(repo_url)
self.repo_rev)
self._validate_site_show_action(repo_url, temp_path)
def test_show_site_using_local_path(self):
def test_show_site_using_local_path(self, temp_path):
"""Validates show action using local repo path."""
# Scenario:
#
# 1) Show site (should skip clone repo)
repo_path = self.treasuremap_path
self._validate_site_show_action(repo_path)
self._validate_site_show_action(repo_path, temp_path)
### Render tests ###
@ -360,7 +362,7 @@ class TestSiteCliActions(BaseCLIActionTest):
# 2) Render site (should clone repo automatically)
repo_url = 'https://opendev.org/airship/%s@%s' % (self.repo_name,
self.repo_rev)
self.repo_rev)
self._validate_render_site_action(repo_url)
def test_render_site_using_local_path(self):
@ -385,10 +387,10 @@ class TestSiteCliActions(BaseCLIActionTest):
repo_path = self.treasuremap_path
with mock.patch('pegleg.cli.ShipyardHelper') as mock_obj:
result = self.runner.invoke(cli.site,
['-r', repo_path, 'upload',
self.site_name, '--collection',
'collection'])
result = self.runner.invoke(cli.site, [
'-r', repo_path, 'upload', self.site_name, '--collection',
'collection'
])
assert result.exit_code == 0
mock_obj.assert_called_once()
@ -403,9 +405,8 @@ class TestSiteCliActions(BaseCLIActionTest):
repo_path = self.treasuremap_path
with mock.patch('pegleg.cli.ShipyardHelper') as mock_obj:
result = self.runner.invoke(cli.site,
['-r', repo_path, 'upload',
self.site_name])
result = self.runner.invoke(
cli.site, ['-r', repo_path, 'upload', self.site_name])
assert result.exit_code == 0
mock_obj.assert_called_once()
@ -435,7 +436,7 @@ class TestRepoCliActions(BaseCLIActionTest):
# 2) Lint repo with exclude flags (should clone repo automatically)
repo_url = 'https://opendev.org/airship/%s@%s' % (self.repo_name,
self.repo_rev)
self.repo_rev)
lint_command = ['-r', repo_url, 'lint']
exclude_lint_command = [
@ -484,10 +485,11 @@ class TestSiteSecretsActions(BaseCLIActionTest):
@classmethod
def setup_class(cls):
super(TestSiteSecretsActions, cls).setup_class()
cls.runner = CliRunner(env={
"PEGLEG_PASSPHRASE": 'ytrr89erARAiPE34692iwUMvWqqBvC',
"PEGLEG_SALT": "MySecretSalt1234567890]["
})
cls.runner = CliRunner(
env={
"PEGLEG_PASSPHRASE": 'ytrr89erARAiPE34692iwUMvWqqBvC',
"PEGLEG_SALT": "MySecretSalt1234567890]["
})
def _validate_generate_pki_action(self, result):
assert result.exit_code == 0
@ -504,9 +506,9 @@ class TestSiteSecretsActions(BaseCLIActionTest):
result = yaml.safe_load_all(f) # Validate valid YAML.
assert list(result), "%s file is empty" % generated_file
@pytest.mark.skipif(
not pki_utility.PKIUtility.cfssl_exists(),
reason='cfssl must be installed to execute these tests')
@pytest.mark.skipif(not pki_utility.PKIUtility.cfssl_exists(),
reason='cfssl must be installed to execute these tests'
)
def test_site_secrets_generate_pki_using_remote_repo_url(self):
"""Validates ``generate-pki`` action using remote repo URL."""
# Scenario:
@ -514,16 +516,16 @@ class TestSiteSecretsActions(BaseCLIActionTest):
# 1) Generate PKI using remote repo URL
repo_url = 'https://opendev.org/airship/%s@%s' % (self.repo_name,
self.repo_rev)
self.repo_rev)
secrets_opts = ['secrets', 'generate-pki', self.site_name]
result = self.runner.invoke(cli.site, ['-r', repo_url] + secrets_opts)
self._validate_generate_pki_action(result)
@pytest.mark.skipif(
not pki_utility.PKIUtility.cfssl_exists(),
reason='cfssl must be installed to execute these tests')
@pytest.mark.skipif(not pki_utility.PKIUtility.cfssl_exists(),
reason='cfssl must be installed to execute these tests'
)
def test_site_secrets_generate_pki_using_local_repo_path(self):
"""Validates ``generate-pki`` action using local repo path."""
# Scenario:
@ -536,13 +538,14 @@ class TestSiteSecretsActions(BaseCLIActionTest):
result = self.runner.invoke(cli.site, ['-r', repo_path] + secrets_opts)
self._validate_generate_pki_action(result)
@pytest.mark.skipif(
not pki_utility.PKIUtility.cfssl_exists(),
reason='cfssl must be installed to execute these tests')
@mock.patch.dict(os.environ, {
"PEGLEG_PASSPHRASE": "123456789012345678901234567890",
"PEGLEG_SALT": "MySecretSalt1234567890]["
})
@pytest.mark.skipif(not pki_utility.PKIUtility.cfssl_exists(),
reason='cfssl must be installed to execute these tests'
)
@mock.patch.dict(
os.environ, {
"PEGLEG_PASSPHRASE": "123456789012345678901234567890",
"PEGLEG_SALT": "MySecretSalt1234567890]["
})
def test_site_secrets_encrypt_and_decrypt_local_repo_path(self):
"""Validates ``generate-pki`` action using local repo path."""
# Scenario:
@ -551,7 +554,7 @@ class TestSiteSecretsActions(BaseCLIActionTest):
repo_path = self.treasuremap_path
file_path = os.path.join(repo_path, "site", "airship-seaworthy",
"secrets", "passphrases", "ceph_fsid.yaml")
"secrets", "passphrases", "ceph_fsid.yaml")
with open(file_path, "r") as ceph_fsid_fi:
ceph_fsid = yaml.safe_load(ceph_fsid_fi)
ceph_fsid["metadata"]["storagePolicy"] = "encrypted"
@ -572,24 +575,26 @@ class TestSiteSecretsActions(BaseCLIActionTest):
assert "encrypted" in ceph_fsid["data"]
assert "managedDocument" in ceph_fsid["data"]
secrets_opts = ['secrets', 'decrypt', '--path', file_path,
self.site_name]
secrets_opts = [
'secrets', 'decrypt', '--path', file_path, self.site_name
]
result = self.runner.invoke(cli.site, ['-r', repo_path] + secrets_opts)
assert result.exit_code == 0, result.output
@pytest.mark.skipif(
not pki_utility.PKIUtility.cfssl_exists(),
reason='cfssl must be installed to execute these tests')
@pytest.mark.skipif(not pki_utility.PKIUtility.cfssl_exists(),
reason='cfssl must be installed to execute these tests'
)
def test_check_pki_certs(self):
repo_path = self.treasuremap_path
secrets_opts = ['secrets', 'check-pki-certs', self.site_name]
result = self.runner.invoke(cli.site, ['-r', repo_path] + secrets_opts)
assert result.exit_code == 0, result.output
@mock.patch.dict(os.environ, {
"PEGLEG_PASSPHRASE": "123456789012345678901234567890",
"PEGLEG_SALT": "123456"
})
@mock.patch.dict(
os.environ, {
"PEGLEG_PASSPHRASE": "123456789012345678901234567890",
"PEGLEG_SALT": "123456"
})
def test_site_secrets_wrap(self):
"""Validates ``generate-pki`` action using local repo path."""
# Scenario:
@ -604,26 +609,32 @@ class TestSiteSecretsActions(BaseCLIActionTest):
with open(file_path, "w") as test_crt_fi:
test_crt_fi.write(test_cert)
secrets_opts = ['secrets', 'wrap', "-a", "lm734y", "--filename",
file_path, "-s", "deckhand/Certificate/v1",
"-n", "test-certificate", "-l", "site", "--no-encrypt",
self.site_name]
secrets_opts = [
'secrets', 'wrap', "-a", "lm734y", "--filename", file_path, "-s",
"deckhand/Certificate/v1", "-n", "test-certificate", "-l", "site",
"--no-encrypt", self.site_name
]
result = self.runner.invoke(cli.site, ["-r", repo_path] + secrets_opts)
assert result.exit_code == 0
with open(output_path, "r") as output_fi:
doc = yaml.safe_load(output_fi)
assert doc["data"]["managedDocument"]["data"] == test_cert
assert doc["data"]["managedDocument"]["schema"] == "deckhand/Certificate/v1"
assert doc["data"]["managedDocument"]["metadata"]["name"] == "test-certificate"
assert doc["data"]["managedDocument"]["metadata"]["layeringDefinition"]["layer"] == "site"
assert doc["data"]["managedDocument"]["metadata"]["storagePolicy"] == "cleartext"
assert doc["data"]["managedDocument"][
"schema"] == "deckhand/Certificate/v1"
assert doc["data"]["managedDocument"]["metadata"][
"name"] == "test-certificate"
assert doc["data"]["managedDocument"]["metadata"][
"layeringDefinition"]["layer"] == "site"
assert doc["data"]["managedDocument"]["metadata"][
"storagePolicy"] == "cleartext"
os.remove(output_path)
secrets_opts = ['secrets', 'wrap', "-a", "lm734y", "--filename", file_path,
"-o", output_path, "-s", "deckhand/Certificate/v1",
"-n", "test-certificate", "-l", "site",
self.site_name]
secrets_opts = [
'secrets', 'wrap', "-a", "lm734y", "--filename", file_path, "-o",
output_path, "-s", "deckhand/Certificate/v1", "-n",
"test-certificate", "-l", "site", self.site_name
]
result = self.runner.invoke(cli.site, ["-r", repo_path] + secrets_opts)
assert result.exit_code == 0
@ -632,43 +643,45 @@ class TestSiteSecretsActions(BaseCLIActionTest):
assert "encrypted" in doc["data"]
assert "managedDocument" in doc["data"]
class TestTypeCliActions(BaseCLIActionTest):
"""Tests type-level CLI actions."""
def setup(self):
self.expected_types = ['foundry']
def _assert_table_has_expected_sites(self, mock_output):
table_output = mock_output.write.mock_calls[0][1][0]
def _assert_table_has_expected_sites(self, table_output):
for expected_type in self.expected_types:
assert expected_type in table_output
def _validate_type_list_action(self, repo_path_or_url):
mock_output = mock.Mock()
def _validate_type_list_action(self, repo_path_or_url, temp_path):
mock_output = os.path.join(temp_path, 'output')
result = self.runner.invoke(
cli.type, ['-r', repo_path_or_url, 'list', '-o', mock_output])
with open(mock_output, 'r') as f:
table_output = f.read()
assert result.exit_code == 0, result.output
self._assert_table_has_expected_sites(mock_output)
self._assert_table_has_expected_sites(table_output)
def test_list_types_using_remote_repo_url(self):
def test_list_types_using_remote_repo_url(self, temp_path):
"""Validates list types action using remote repo URL."""
# Scenario:
#
# 1) List types (should clone repo automatically)
repo_url = 'https://opendev.org/airship/%s@%s' % (self.repo_name,
self.repo_rev)
self._validate_type_list_action(repo_url)
self.repo_rev)
self._validate_type_list_action(repo_url, temp_path)
def test_list_types_using_local_repo_path(self):
def test_list_types_using_local_repo_path(self, temp_path):
"""Validates list types action using local repo path."""
# Scenario:
#
# 1) List types for local repo path
repo_path = self.treasuremap_path
self._validate_type_list_action(repo_path)
self._validate_type_list_action(repo_path, temp_path)
class TestSiteCliActionsWithSubdirectory(BaseCLIActionTest):
@ -677,20 +690,22 @@ class TestSiteCliActionsWithSubdirectory(BaseCLIActionTest):
def setup(self):
self.expected_sites = ['demo', 'gate-multinode', 'dev', 'dev-proxy']
def _assert_table_has_expected_sites(self, mock_output):
table_output = mock_output.write.mock_calls[0][1][0]
def _assert_table_has_expected_sites(self, table_output):
for expected_site in self.expected_sites:
assert expected_site in table_output
def _validate_list_site_action(self, repo_path_or_url):
mock_output = mock.Mock()
def _validate_list_site_action(self, repo_path_or_url, temp_path):
mock_output = os.path.join(temp_path, 'output')
result = self.runner.invoke(
cli.site, ['-r', repo_path_or_url, 'list', '-o', mock_output])
assert result.exit_code == 0, result.output
self._assert_table_has_expected_sites(mock_output)
with open(mock_output, 'r') as f:
table_output = f.read()
def test_site_action_with_subpath_in_remote_url(self):
assert result.exit_code == 0, result.output
self._assert_table_has_expected_sites(table_output)
def test_site_action_with_subpath_in_remote_url(self, temp_path):
"""Validates list action with subpath in remote URL."""
# Scenario:
#
@ -703,9 +718,9 @@ class TestSiteCliActionsWithSubdirectory(BaseCLIActionTest):
repo_url = 'https://opendev.org/airship/%s/deployment_files@%s' % (
repo_name, repo_rev)
self._validate_list_site_action(repo_url)
self._validate_list_site_action(repo_url, temp_path)
def test_site_action_with_subpath_in_local_repo_path(self):
def test_site_action_with_subpath_in_local_repo_path(self, temp_path):
"""Validates list action with subpath in local repo path."""
# Scenario:
#
@ -719,4 +734,4 @@ class TestSiteCliActionsWithSubdirectory(BaseCLIActionTest):
_repo_path = git.git_handler(repo_url, ref=repo_rev)
repo_path = os.path.join(_repo_path, 'deployment_files')
self._validate_list_site_action(repo_path)
self._validate_list_site_action(repo_path, temp_path)