spyglass/spyglass/validators/json_validator.py

164 lines
6.5 KiB
Python

# Copyright 2019 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from glob import glob
import logging
import os
from jsonschema import Draft7Validator
import yaml
from spyglass import exceptions
from spyglass.validators.validator import BaseDocumentValidator
LOG = logging.getLogger(__name__)
LOG_FORMAT = '%(asctime)s %(levelname)-8s %(name)s:' \
'%(funcName)s [%(lineno)3d] %(message)s'
class JSONSchemaValidator(BaseDocumentValidator):
"""Validator for validating documents using jsonschema"""
def __init__(
self,
document_path,
schema_path,
document_extension='.yaml',
schema_extension='.yaml',
document_loader=yaml.safe_load,
schema_loader=yaml.safe_load):
super().__init__()
# Check that given paths are valid
if not os.path.exists(document_path):
raise exceptions.PathDoesNotExistError(
file_type='Document', path=document_path)
if not os.path.exists(schema_path):
raise exceptions.PathDoesNotExistError(
file_type='Schema', path=schema_path)
# Extract list of document file paths from path
if os.path.isdir(document_path):
# Create match string and use glob to generate list of file paths
match = os.path.join(document_path, '**', '*' + document_extension)
self.documents = glob(match, recursive=True)
# Directory should not be empty
if not self.documents:
raise exceptions.DirectoryEmptyError(
ext=document_extension, path=document_path)
elif os.path.splitext(document_path) == document_extension:
# Single files can just be appended to the list to process the same
# so long as the extension matches
self.documents.append(document_path)
else:
# Throw error if unexpected file type given
raise exceptions.UnexpectedFileType(
found_ext=os.path.splitext(document_path),
expected_ext=document_extension)
# Extract list of schema file paths from path
if os.path.isdir(schema_path):
# Create match string and use glob to generate list of file paths
match = os.path.join(schema_path, '**', '*' + schema_extension)
self.schemas = glob(match, recursive=True)
# Directory should not be empty
if not self.schemas:
raise exceptions.DirectoryEmptyError(
ext=schema_extension, path=schema_path)
elif os.path.splitext(schema_path) == schema_extension:
# Single files can just be appended to the list to process the same
self.schemas.append(schema_path)
else:
# Throw error if unexpected file type given
raise exceptions.UnexpectedFileType(
found_ext=os.path.splitext(schema_path),
expected_ext=schema_extension)
# Initialize pairs list for next step
self.document_schema_pairs = []
self.document_loader = document_loader
self.schema_loader = schema_loader
self._match_documents_to_schemas()
def _match_documents_to_schemas(self):
"""Pairs documents to their schemas for easier processing
Loops through all documents and finds its associated schema using the
"schema" key from documents and the "metadata:name" key from schemas.
Matching document/schema pairs are added to document_schema_pairs. Any
unmatched documents will display a warning.
"""
if not self.documents:
LOG.warning('No documents found.')
if not self.schemas:
LOG.warning('No schemas found.')
for document in self.documents:
pair_found = False
with open(document, 'r') as f_doc:
loaded_doc = self.document_loader(f_doc)
if 'schema' in loaded_doc:
schema_name = loaded_doc['schema']
for schema in self.schemas:
with open(schema, 'r') as f_schema:
loaded_schema = self.schema_loader(f_schema)
if schema_name == loaded_schema['metadata']['name']:
self.document_schema_pairs.append((document, schema))
pair_found = True
break
else:
LOG.warning('No schema entry found for file %s', document)
if not pair_found:
LOG.warning(
'No matching schema found for file %s, '
'data will not be validated.', document)
def _validate_file(self, document, schema):
"""Validate a document against a schema using JSON Schema Draft 7
:param document: File path to the document to validate
:param schema: File path to the schema used to validate document
:return: A list of errors from the validator
"""
with open(document, 'r') as f_doc:
loaded_doc = self.document_loader(f_doc)
with open(schema, 'r') as f_schema:
loaded_schema = self.schema_loader(f_schema)
validator = Draft7Validator(loaded_schema)
return sorted(validator.iter_errors(loaded_doc), key=lambda e: e.path)
def validate(self):
"""Validates document against its schema
Loops through document_schema_pairs list and validates each pair. Any
errors are logged and returned in a dictionary by file.
:return: A dictionary of filenames and their list of validation errors
"""
error_list = {}
for document, schema in self.document_schema_pairs:
LOG.info(
'Validating document %s using schema %s', document, schema)
errors = self._validate_file(document, schema)
if errors:
for error in errors:
LOG.error(error.message)
error_list[document] = errors
return error_list