Adding ManifestValidator

Partial-blueprint: package-validation

Change-Id: I5f98149dcdf6011a03d41faf585a8c087402e559
This commit is contained in:
Krzysztof Szukiełojć 2016-08-22 14:21:33 +02:00
parent c59ed6dec5
commit 3470094b08
6 changed files with 464 additions and 0 deletions

View File

@ -0,0 +1,107 @@
# Copyright (c) 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from muranopkgcheck.tests import test_validator_helpers as helpers
from muranopkgcheck.validators import manifest
class ManifestValidatorTests(helpers.BaseValidatorTestClass):
def setUp(self):
super(ManifestValidatorTests, self).setUp()
self._oe_patcher = mock.patch('os.path.exists')
self.exists = self._oe_patcher.start()
self.exists.return_value = [True, True]
self.loaded_package = mock.Mock()
self.mv = manifest.ManifestValidator(self.loaded_package)
def test_format_as_number(self):
self.g = self.mv._valid_format(1.3)
def test_wrong_format(self):
self.g = self.mv._valid_format('0.9')
self.assertIn('Not supported format version "0.9"',
next(self.g).message)
def test_valid_string(self):
self.g = self.mv._valid_string([])
self.assertIn('Value is not a string "[]"',
next(self.g).message)
def test_heat_format(self):
self.g = self.mv._valid_format('Heat/1.0')
self.assertIn('Not supported format "Heat/1.0"',
next(self.g).message)
def test_unsupported_format(self):
self.g = self.mv._valid_format('Heat.HOT')
self.assertIn('Not supported format version "Heat.HOT"',
next(self.g).message)
def test_wrong_type(self):
self.g = self.mv._valid_type('Shared Library')
self.assertIn('Type is invalid "Shared Library"', next(self.g).message)
def test_wrong_require_type(self):
self.g = self.mv._valid_require([1, 2, 3])
self.assertIn('Require is not a dict type', next(self.g).message)
def test_not_existing_file(self):
data = {'org.openstack.Flow': 'FlowClassifier.yaml',
'org.openstack.Instance': 'Instance.yaml'}
self.loaded_package.search_for.return_value = ['FlowClassifier.yaml']
self.g = self.mv._valid_classes(data)
self.assertIn('File is present in Manifest Instance.yaml, but not in '
'filesystem', next(self.g).message)
def test_extra_file_in_directory(self):
data = {'org.openstack.Instance': 'Instance.yaml'}
self.loaded_package.search_for.return_value = ['FlowClassifier.yaml',
'Instance.yaml']
self.g = self.mv._valid_classes(data)
self.assertIn('File is not present in Manifest, but it is in '
'filesystem: FlowClassifier.yaml', next(self.g).message)
def test_classess_list(self):
data = [{'org.openstack.Instance': 'Instance.yaml'}]
self.loaded_package.search_for.return_value = ['FlowClassifier.yaml',
'Instance.yaml']
self.g = self.mv._valid_classes(data)
self.assertIn('Classes section should be a dict', next(self.g).message)
def test_missing_ui_file(self):
self.loaded_package.exists.return_value = False
self.g = self.mv._valid_ui('ui.yaml')
self.assertIn('There is no UI file "ui.yaml"',
next(self.g).message)
def test_missing_logo_file(self):
self.loaded_package.exists.return_value = False
self.g = self.mv._valid_logo('logo.png')
self.assertIn('There is no Logo file "logo.png"',
next(self.g).message)
def test_wrong_logo_type(self):
self.g = self.mv._valid_logo([1, 2, 3])
self.assertIn('Logo is not a string', next(self.g).message)
def test_wrong_ui_type(self):
self.g = self.mv._valid_ui([1, 2, 3])
self.assertIn('UI is not a string', next(self.g).message)
def test_tags(self):
self.g = self.mv._valid_tags('whatever')
self.assertIn('Tags should be a list', next(self.g).message)

View File

@ -0,0 +1,43 @@
# Copyright (c) 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import types
import unittest
class BaseValidatorTestClass(unittest.TestCase):
def setUp(self):
self._g = []
def tearDown(self):
problems = [p for p in self.g]
for p in problems:
print('Left errors:', p)
self.assertEqual(len(problems), 0)
def _linear(self, error_chain):
for e in error_chain:
if isinstance(e, types.GeneratorType):
for w in self._linear(e):
yield w
else:
yield e
def get_g(self):
return self._g
def set_g(self, value):
self._g = self._linear(value)
g = property(get_g, set_g)

View File

@ -0,0 +1,65 @@
# Copyright (c) 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import unittest
from muranopkgcheck.validators import base
class YamlValidatorTest(unittest.TestCase):
def setUp(self):
self.pkg = mock.Mock()
self.pkg.search_for.return_value = ['sth']
self.fmock = mock.Mock()
self.document = mock.Mock()
self.pkg.read.return_value = self.fmock
self.v = base.YamlValidator(self.pkg, '***')
def test_checker_with_ast(self):
c = mock.Mock()
c.return_value = 'ok'
self.fmock.yaml.return_value = [{}]
self.v.add_checker(c)
self.v.run()
c.assert_called_once_with({})
self.pkg.search_for.assert_called_once_with('***')
def test_run_single_with_key_checker(self):
c = mock.Mock()
c.return_value = 'ok'
self.fmock.yaml.return_value = [{'key': 'whatever'}]
self.v.add_checker(c, 'key')
self.v.run()
c.assert_called_once_with('whatever')
self.pkg.search_for.assert_called_once_with('***')
def test_two_keys_unknown_key(self):
c = mock.Mock()
c.return_value = None
self.fmock.yaml.return_value = [{'key': 'whatever',
'unknown': ''}]
self.v.add_checker(c, 'key')
errors = self.v.run()
c.assert_called_once_with('whatever')
self.pkg.search_for.assert_called_once_with('***')
self.assertIn('Unknown keyword "unknown"', next(errors).message)
def test_missing_required_key(self):
c = mock.Mock()
self.fmock.yaml.return_value = [{}]
self.v.add_checker(c, 'key')
errors = self.v.run()
self.pkg.search_for.assert_called_once_with('***')
self.assertIn('Missing required key "key"', next(errors).message)

View File

@ -12,5 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from muranopkgcheck.validators import manifest
VALIDATORS = [
manifest.ManifestValidator,
]

View File

@ -0,0 +1,139 @@
# Copyright (c) 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import itertools
import re
import six
from muranopkgcheck import error
from muranopkgcheck import log
LOG = log.get_logger(__name__)
FQN_REGEX = re.compile('^([a-zA-Z_$][\w$]*\.)*[a-zA-Z_$][\w$]*$')
NAME_REGEX = re.compile('^[A-Za-z_][\w]*$')
def check_version(method, version):
since = method._mpl_since
till = method._mpl_till
if (not since or version >= since) and\
(not till or version <= till):
return True
return False
def format_support(since=None, till=None):
def _func(func):
func._mpl_since = since
func._mpl_till = till
return func
return _func
@six.add_metaclass(abc.ABCMeta)
class BaseValidator(object):
def __init__(self, loaded_package, _filter='.*'):
self._loaded_pkg = loaded_package
self._filter = _filter
@abc.abstractmethod
def _run_single(self, file_):
pass
def run(self):
chain_of_suits = []
for filename in self._loaded_pkg.search_for(self._filter):
file_ = self._loaded_pkg.read(filename)
chain_of_suits.append(self._run_single(file_))
return itertools.chain(*chain_of_suits)
def _valid_string(self, value):
if not isinstance(value, (int, float, bool, six.string_types)):
yield error.report.E040('Value is not a string "{0}"'
.format(value),
value)
def _check_name(self, name):
if NAME_REGEX.match(name):
return True
return False
def _check_fqn_name(self, fqn):
if isinstance(fqn, six.string_types) and FQN_REGEX.match(fqn):
return True
return False
class YamlValidator(BaseValidator):
def __init__(self, loaded_package, _filter='.*', allows_multi=False):
super(YamlValidator, self).__init__(loaded_package, _filter)
self._checkers = {}
self._allows_multi = allows_multi
def add_checker(self, function, key=None, required=True):
checkers = self._checkers.setdefault(key, {'checkers': [],
'required': False})
checkers['checkers'].append(function)
if key is None:
checkers['required'] = False
elif required:
checkers['required'] = True
def _run_single(self, file_):
reports_chain = []
multi_documents = file_.yaml()
if multi_documents is None:
multi_documents = [{}]
def run_helper(name, checkers, data):
for checker in checkers:
result = checker(data)
if result:
reports_chain.append(result)
if len(multi_documents) > 1 and not self._allows_multi:
reports_chain.append(
error.report.E005('Multi document is not allowed in {0}'
.format(file_._path)))
for ast in multi_documents:
file_check = self._checkers.get(None)
if file_check:
run_helper(None, file_check['checkers'], ast)
for key, value in six.iteritems(ast):
checkers = self._checkers.get(key)
if checkers:
run_helper(key, checkers['checkers'], ast[key])
else:
reports_chain.append(self._unknown_keyword(key, value))
missing = set(key for key, value in six.iteritems(self._checkers)
if value['required']) - set(ast.keys())
for m in missing:
reports_chain.append([error.report.E020('Missing required key '
'"{0}"'.format(m), m)])
return itertools.chain(*reports_chain)
def _valid_keywords(self, present, known):
unknown = set(present) - set(known)
for u in unknown:
yield error.report.E021('Unknown keyword "{0}"'.format(u), u)
def _unknown_keyword(self, key, value):
yield error.report.W010('Unknown keyword "{0}"'.format(key), key)
def _null_checker(self, value):
pass

View File

@ -0,0 +1,107 @@
# Copyright (c) 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os.path
import six
from muranopkgcheck import error
from muranopkgcheck.validators import base
class ManifestValidator(base.YamlValidator):
def __init__(self, loaded_package):
super(ManifestValidator, self).__init__(loaded_package,
'manifest.yaml$')
self.add_checker(self._valid_format, 'Format', False)
self.add_checker(self._valid_string, 'Author', False)
self.add_checker(self._valid_string, 'Version', False)
self.add_checker(self._valid_fullname, 'FullName')
self.add_checker(self._valid_string, 'Name', False)
self.add_checker(self._valid_classes, 'Classes', False)
self.add_checker(self._valid_tags, 'Tags', False)
self.add_checker(self._valid_require, 'Require', False)
self.add_checker(self._valid_type, 'Type')
self.add_checker(self._valid_string, 'Description')
self.add_checker(self._valid_ui, 'UI', False)
self.add_checker(self._valid_logo, 'Logo', False)
self.add_checker(self._valid_logo_ui_existance)
def _valid_format(self, value):
format_ = str(value).split('/', 1)
if len(format_) > 1:
if format_[0] != 'MuranoPL':
yield error.report.E030('Not supported format "{0}"'
.format(value), value)
ver = format_[-1]
if str(ver) not in ['1.0', '1.1', '1.2', '1.3', '1.4']:
yield error.report.W030('Not supported format version "{0}"'
.format(value), value)
def _valid_fullname(self, fullname):
if not self._check_fqn_name(fullname):
yield error.report.E073('Invalid FullName "{0}"', fullname)
def _valid_tags(self, value):
if not isinstance(value, list):
yield error.report.E070('Tags should be a list', value)
def _valid_require(self, value):
if not isinstance(value, dict):
yield error.report.E005('Require is not a dict type', value)
def _valid_type(self, value):
if value not in ('Application', 'Library'):
yield error.report.E071('Type is invalid "{0}"'.format(value),
value)
def _valid_logo_ui_existance(self, ast):
if 'Logo' not in ast:
yield self._valid_logo('logo.png')
if 'UI' not in ast:
yield self._valid_ui('ui.yaml')
def _valid_ui(self, value):
if isinstance(value, six.string_types):
if not self._loaded_pkg.exists(os.path.join('UI', value)):
yield error.report.W073('There is no UI file "{0}"'
.format(value), value)
else:
yield error.report.E072('UI is not a string', value)
def _valid_logo(self, value):
if isinstance(value, six.string_types):
if not self._loaded_pkg.exists(value):
yield error.report.W074('There is no Logo file "{0}"'
.format(value), value)
else:
yield error.report.E074('Logo is not a string', value)
def _valid_classes(self, value):
if not isinstance(value, dict):
yield error.report.E074('Classes section should be a dict', value)
return
files = set(value.values())
existing_files = set(self._loaded_pkg.search_for('.*',
'Classes'))
for fname in files - existing_files:
yield error.report.E050('File is present in Manifest {fname}, '
'but not in filesystem'
.format(fname=fname),
fname)
for fname in existing_files - files:
yield error.report.W020('File is not present in Manifest, but '
'it is in filesystem: {fname}'
.format(fname=fname), fname)