pegleg/pegleg/engine/lint.py

357 lines
13 KiB
Python

# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import click
import logging
import os
import pkg_resources
import shutil
import textwrap
from prettytable import PrettyTable
from pegleg import config
from pegleg.engine.errorcodes import DOCUMENT_LAYER_MISMATCH
from pegleg.engine.errorcodes import FILE_CONTAINS_INVALID_YAML
from pegleg.engine.errorcodes import FILE_MISSING_YAML_DOCUMENT_HEADER
from pegleg.engine.errorcodes import REPOS_MISSING_DIRECTORIES_FLAG
from pegleg.engine.errorcodes import SCHEMA_STORAGE_POLICY_MISMATCH_FLAG
from pegleg.engine.errorcodes import SECRET_NOT_ENCRYPTED_POLICY
from pegleg.engine import util
__all__ = ['full']
LOG = logging.getLogger(__name__)
DECKHAND_SCHEMAS = {
'root': 'schemas/deckhand-root.yaml',
'metadata/Control/v1': 'schemas/deckhand-metadata-control.yaml',
'metadata/Document/v1': 'schemas/deckhand-metadata-document.yaml',
}
def full(fail_on_missing_sub_src=False, exclude_lint=None, warn_lint=None):
"""Lint all sites in a repository.
:param bool fail_on_missing_sub_src: Whether to allow Deckhand rendering
to fail in the absence of a missing substitution source document which
might be the case in "offline mode".
:param list exclude_lint: List of lint rules to exclude. See those
defined in :mod:`pegleg.engine.errorcodes`.
:param list warn_lint: List of lint rules to warn about. See those
defined in :mod:`pegleg.engine.errorcodes`.
:raises ClickException: If a lint check was caught and it isn't contained
in ``exclude_lint`` or ``warn_lint``.
:returns: List of warnings produced, if any.
"""
exclude_lint = exclude_lint or []
warn_lint = warn_lint or []
messages = []
# If policy is cleartext and error is added this will put
# that particular message into the warns list and all others will
# be added to the error list if SCHEMA_STORAGE_POLICY_MISMATCH_FLAG
messages.extend(_verify_file_contents())
# FIXME(felipemonteiro): Now that we are using revisioned repositories
# instead of flat directories with subfolders mirroring "revisions",
# this lint check analyzes ALL the directories (including these
# no-longer-valid "revision directories") against the new subset of
# relevant directories. We need to rewrite this check so that it works
# after site definitions have been refactored to move the manifests
# under fake repository folders into the common/ folders.
#
# messages.extend(_verify_no_unexpected_files())
# Deckhand rendering completes without error
messages.extend(
_verify_deckhand_render(
fail_on_missing_sub_src=fail_on_missing_sub_src))
return _filter_messages_by_warn_and_error_lint(
messages=messages, exclude_lint=exclude_lint, warn_lint=warn_lint)
def site(site_name,
fail_on_missing_sub_src=False,
exclude_lint=None,
warn_lint=None):
"""Lint ``site_name``.
:param str site_name: Name of site to lint.
:param bool fail_on_missing_sub_src: Whether to allow Deckhand rendering
to fail in the absence of a missing substitution source document which
might be the case in "offline mode".
:param list exclude_lint: List of lint rules to exclude. See those
defined in :mod:`pegleg.engine.errorcodes`.
:param list warn_lint: List of lint rules to warn about. See those
defined in :mod:`pegleg.engine.errorcodes`.
:raises ClickException: If a lint check was caught and it isn't contained
in ``exclude_lint`` or ``warn_lint``.
:returns: List of warnings produced, if any.
"""
exclude_lint = exclude_lint or []
warn_lint = warn_lint or []
messages = []
# FIXME(felipemonteiro): Now that we are using revisioned repositories
# instead of flat directories with subfolders mirroring "revisions",
# this lint check analyzes ALL the directories (including these
# no-longer-valid "revision directories") against the new subset of
# relevant directories. We need to rewrite this check so that it works
# after site definitions have been refactored to move the manifests
# under fake repository folders into the common/ folders.
#
# messages.extend(_verify_no_unexpected_files(sitenames=[site_name]))
# If policy is cleartext and error is added this will put
# that particular message into the warns list and all others will
# be added to the error list if SCHEMA_STORAGE_POLICY_MISMATCH_FLAG
messages.extend(_verify_file_contents(sitename=site_name))
# Deckhand rendering completes without error
messages.extend(
_verify_deckhand_render(
sitename=site_name,
fail_on_missing_sub_src=fail_on_missing_sub_src))
return _filter_messages_by_warn_and_error_lint(
messages=messages, exclude_lint=exclude_lint, warn_lint=warn_lint)
def _filter_messages_by_warn_and_error_lint(*,
messages=None,
exclude_lint=None,
warn_lint=None):
"""Helper that only filters messages depending on whether or not they
are present in ``exclude_lint`` or ``warn_lint``.
Bubbles up errors only if the corresponding code for each is **not** found
in either ``exclude_lint`` or ``warn_lint``. If the code is found in
``exclude_lint``, the lint code is ignored; if the code is found in
``warn_lint``, the lint is warned about.
"""
messages = messages or []
exclude_lint = exclude_lint or []
warn_lint = warn_lint or []
errors = []
warns = []
# Create tables to output CLI results
errors_table = PrettyTable()
errors_table.field_names = ['error_code', 'error_message']
warnings_table = PrettyTable()
warnings_table.field_names = ['warning_code', 'warning_message']
# Calculate terminal size to always make sure that the table output
# is readable regardless of screen size
line_length = int(shutil.get_terminal_size().columns / 1.5)
for code, message in messages:
if code in warn_lint:
warns.append('%s: %s' % (code, message))
warnings_table.add_row([code, textwrap.fill(message, line_length)])
elif code not in exclude_lint:
errors.append('%s: %s' % (code, message))
errors_table.add_row([code, textwrap.fill(message, line_length)])
if errors:
raise click.ClickException(
'Linting failed:\n' + errors_table.get_string() +
'\nLinting warnings:\n' + warnings_table.get_string())
return warns
def _verify_no_unexpected_files(*, sitenames=None):
sitenames = sitenames or util.files.list_sites()
expected_directories = set()
for site_name in sitenames:
params = util.definition.load_as_params(site_name)
expected_directories.update(util.files.directories_for(**params))
LOG.debug('expected_directories: %s', expected_directories)
found_directories = util.files.existing_directories()
LOG.debug('found_directories: %s', found_directories)
errors = []
for unused_dir in sorted(found_directories - expected_directories):
errors.append((REPOS_MISSING_DIRECTORIES_FLAG,
'%s exists, but is unused' % unused_dir))
for missing_dir in sorted(expected_directories - found_directories):
if not missing_dir.endswith('common'):
errors.append(
(REPOS_MISSING_DIRECTORIES_FLAG,
'%s was not found, but expected by manifest' % missing_dir))
return errors
def _verify_file_contents(*, sitename=None):
if sitename:
files = util.definition.site_files(sitename)
else:
files = util.files.all()
schemas = _load_schemas()
errors = []
for filename in files:
errors.extend(_verify_single_file(filename, schemas))
return errors
def _verify_single_file(filename, schemas):
errors = []
LOG.debug("Validating file %s.", filename)
with open(filename) as f:
if not f.read(4) == '---\n':
errors.append((FILE_MISSING_YAML_DOCUMENT_HEADER,
'%s does not begin with YAML beginning of document '
'marker "---".' % filename))
documents = []
try:
documents = util.files.read(filename)
except Exception as e:
errors.append((FILE_CONTAINS_INVALID_YAML,
'%s is not valid yaml: %s' % (filename, e)))
for document in documents:
errors.extend(_verify_document(document, schemas, filename))
return errors
MANDATORY_ENCRYPTED_TYPES = {
'deckhand/CertificateAuthorityKey/v1',
'deckhand/CertificateKey/v1',
'deckhand/Passphrase/v1',
'deckhand/PrivateKey/v1',
}
def _verify_document(document, schemas, filename):
name = ':'.join([
document.get('schema', ''),
document.get('metadata', {}).get('name', '')
])
errors = []
layer = _layer(document)
if layer is not None and layer != _expected_layer(filename):
errors.append(
(DOCUMENT_LAYER_MISMATCH,
'%s (document %s) had unexpected layer "%s", expected "%s"' %
(filename, name, layer, _expected_layer(filename))))
# secrets must live in the appropriate directory, and must be
# "storagePolicy: encrypted".
if document.get('schema') in MANDATORY_ENCRYPTED_TYPES:
storage_policy = document.get('metadata', {}).get('storagePolicy')
if (storage_policy != 'encrypted'):
errors.append((SCHEMA_STORAGE_POLICY_MISMATCH_FLAG,
'%s (document %s) is a secret, but has unexpected '
'storagePolicy: "%s"' % (filename, name,
storage_policy)))
# Check if the file is in a secrets directory
if not util.files.file_in_subdir(filename, 'secrets/'):
errors.append((SECRET_NOT_ENCRYPTED_POLICY,
'%s (document %s) is a secret, is not stored in a '
'secrets path' % (filename, name)))
return errors
def _handle_managed_document(doc):
"""
Unwrap a managed document without decrypting it, and convert
the data to an ASCII string if necessary. We're just
using this so that managed documents and the documents
that depend on them can be linted.
:param dict doc: A YAML document
:returns: the processed document
:rtype: dict
"""
if "managedDocument" in doc["data"]:
doc = doc["data"]["managedDocument"]
if isinstance(doc["data"], bytes):
doc["data"] = doc["data"].decode("ascii")
return doc
def _verify_deckhand_render(*, sitename=None, fail_on_missing_sub_src=False):
"""Verify Deckhand render works by using all relevant deployment files.
:returns: List of errors generated during rendering.
"""
all_errors = []
if sitename:
documents_to_render = [_handle_managed_document(doc) for doc in
util.definition.documents_for_site(sitename)]
LOG.debug('Rendering documents for site: %s.', sitename)
_, errors = util.deckhand.deckhand_render(
documents=documents_to_render,
fail_on_missing_sub_src=fail_on_missing_sub_src,
validate=True,
)
LOG.debug('Generated %d rendering errors for site: %s.', len(errors),
sitename)
all_errors.extend(errors)
else:
documents_to_render = util.definition.documents_for_each_site()
for site_name, documents in documents_to_render.items():
clean_documents = [_handle_managed_document(doc) for doc
in documents]
LOG.debug('Rendering documents for site: %s.', site_name)
_, errors = util.deckhand.deckhand_render(
documents=clean_documents,
fail_on_missing_sub_src=fail_on_missing_sub_src,
validate=True,
)
LOG.debug('Generated %d rendering errors for site: %s.',
len(errors), site_name)
all_errors.extend(errors)
return list(set(all_errors))
def _layer(data):
if hasattr(data, 'get'):
return data.get('metadata', {}).get('layeringDefinition',
{}).get('layer')
def _expected_layer(filename):
for r in config.all_repos():
if filename.startswith(r):
partial_name = filename[len(r):]
parts = os.path.normpath(partial_name).split(os.sep)
return parts[0]
def _load_schemas():
schemas = {}
for key, filename in DECKHAND_SCHEMAS.items():
schemas[key] = util.files.slurp(
pkg_resources.resource_filename('pegleg', filename))
return schemas