Merge "Move roles utilities to common"

This commit is contained in:
Jenkins 2017-06-21 23:54:04 +00:00 committed by Gerrit Code Review
commit 8db0a2d95a
3 changed files with 252 additions and 0 deletions

View File

@ -110,3 +110,11 @@ class PlanOperationError(Exception):
class DeriveParamsError(Exception): class DeriveParamsError(Exception):
"""Error while performing a derive parameters operation""" """Error while performing a derive parameters operation"""
class NotFound(Exception):
"""Resource not found"""
class RoleMetadataError(Exception):
"""Role metadata is invalid"""

View File

@ -0,0 +1,121 @@
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import mock
import yaml
from tripleo_common.exception import NotFound
from tripleo_common.exception import RoleMetadataError
from tripleo_common.tests import base
from tripleo_common.utils import roles as rolesutils
SAMPLE_ROLE = """
###############################################################################
# Role: sample #
###############################################################################
- name: sample
description: |
Sample!
networks:
- InternalApi
HostnameFormatDefault: '%stackname%-sample-%index%'
ServicesDefault:
- OS::TripleO::Services::Ntp
"""
SAMPLE_ROLE_OBJ = {
'HostnameFormatDefault': '%stackname%-sample-%index%',
'ServicesDefault': ['OS::TripleO::Services::Ntp'],
'description': 'Sample!\n',
'name': 'sample',
'networks': ['InternalApi']
}
class TestRolesUtils(base.TestCase):
@mock.patch('os.listdir')
@mock.patch('os.path.exists')
def test_get_roles_from_directory(self, exists_mock, listdir_mock):
exists_mock.return_value = True
listdir_mock.return_value = ['b.yaml', 'a.yaml']
self.assertEqual(rolesutils.get_roles_list_from_directory('/foo'),
['a', 'b'])
@mock.patch('os.listdir')
@mock.patch('os.path.exists')
def test_get_roles_from_directory_failure(self, exists_mock, listdir_mock):
exists_mock.return_value = False
self.assertRaises(ValueError, rolesutils.get_roles_list_from_directory,
['/foo'])
def test_validate_roles(self):
available_roles = ['a', 'b', 'c']
requested_roles = ['b', 'c']
try:
rolesutils.check_role_exists(available_roles, requested_roles)
except Exception:
self.fail('Exception raised')
def test_validate_roles_with_invalid_role(self):
available_roles = ['a', 'b', 'c']
requested_roles = ['b', 'd']
self.assertRaises(NotFound, rolesutils.check_role_exists,
available_roles, requested_roles)
@mock.patch('tripleo_common.utils.roles.check_role_exists')
@mock.patch('tripleo_common.utils.roles.get_roles_list_from_directory')
def test_generate_roles_data_from_directory(self, get_roles_mock,
check_mock):
get_roles_mock.return_value = ['foo', 'bar', 'baz']
m = mock.mock_open(read_data=SAMPLE_ROLE)
with mock.patch('tripleo_common.utils.roles.open', m) as open_mock:
r = rolesutils.generate_roles_data_from_directory('/foo',
['foo', 'bar'])
open_mock.assert_any_call('/foo/foo.yaml', 'r')
open_mock.assert_any_call('/foo/bar.yaml', 'r')
header = '\n'.join(["#" * 79,
"# File generated by TripleO",
"#" * 79,
""])
expected = header + SAMPLE_ROLE * 2
self.assertEqual(expected, r)
get_roles_mock.assert_called_with('/foo')
check_mock.assert_called_with(['foo', 'bar', 'baz'], ['foo', 'bar'])
def test_validate_role_yaml(self):
role = rolesutils.validate_role_yaml(SAMPLE_ROLE)
self.assertEqual(SAMPLE_ROLE_OBJ, role)
def test_validate_role_yaml_with_file(self):
m = mock.mock_open(read_data=SAMPLE_ROLE)
with mock.patch('tripleo_common.utils.roles.open', m):
r = rolesutils.validate_role_yaml(role_path='/foo.yaml')
self.assertEqual(SAMPLE_ROLE_OBJ, r)
def test_validate_role_yaml_invalid_params(self):
self.assertRaises(ValueError, rolesutils.validate_role_yaml, 'foo',
'bar')
def test_validate_role_yaml_missing_name(self):
role = yaml.safe_load(SAMPLE_ROLE)
del role[0]['name']
self.assertRaises(RoleMetadataError, rolesutils.validate_role_yaml,
yaml.dump(role))
def test_validate_role_yaml_invalid_type(self):
role = yaml.safe_load(SAMPLE_ROLE)
role[0]['CountDefault'] = 'should not be a string'
self.assertRaises(RoleMetadataError, rolesutils.validate_role_yaml,
yaml.dump(role))

View File

@ -0,0 +1,123 @@
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import os
import shutil
import yaml
from six.moves import cStringIO as StringIO
from tripleo_common.exception import NotFound
from tripleo_common.exception import RoleMetadataError
def get_roles_list_from_directory(directory):
"""Return array of roles in roles path"""
if not os.path.exists(directory):
raise ValueError("Invalid roles path specified: {}".format(
directory))
roles = []
for f in os.listdir(directory):
if f.endswith(".yaml"):
roles.append(f[:-5])
roles.sort()
return roles
def check_role_exists(available_roles, requested_roles):
"""Performs a check on the requested roles to ensure they exist
:param available_roles list of available role names
:param requested_roles list of requested role names
:exception NotFound if a role in the requested list is not available
"""
role_check = set(requested_roles) - set(available_roles)
if len(role_check) > 0:
msg = "Invalid roles requested: {}\nValid Roles:\n{}".format(
','.join(role_check), '\n'.join(available_roles)
)
raise NotFound(msg)
def generate_roles_data_from_directory(directory, roles, validate=True):
"""Generate a roles data file using roles from a local path
:param directory local filesystem path to the roles
:param roles ordered list of roles
:param validate validate the metadata format in the role yaml files
:returns string contents of the roles_data.yaml
"""
available_roles = get_roles_list_from_directory(directory)
check_role_exists(available_roles, roles)
output = StringIO()
header = ["#" * 79,
"# File generated by TripleO",
"#" * 79,
""]
output.write("\n".join(header))
for role in roles:
file_path = os.path.join(directory, "{}.yaml".format(role))
if validate:
validate_role_yaml(role_path=file_path)
with open(file_path, "r") as f:
shutil.copyfileobj(f, output)
return output.getvalue()
def validate_role_yaml(role_data=None, role_path=None):
"""Basic role yaml validation
:param role_data the role yaml in string form
:param role_path the path to the yaml file to validate.
:exception RoleMetadataError
:returns parsed role yaml object
"""
if role_data and role_path or (not role_data and not role_path):
raise ValueError('Either role_data OR role_path must be specified')
if role_path:
with open(role_path, 'r') as f:
role_data = f.read()
try:
role = yaml.safe_load(role_data)[0]
except yaml.YAMLError:
raise RoleMetadataError('Unable to parse role yaml')
schema = {
'name': {'type': str},
'CountDefault': {'type': int},
'HostnameFormatDefault': {'type': str},
'disable_constraints': {'type': bool},
'disable_upgrade_deployment': {'type': bool},
'upgrade_batch_size': {'type': int},
'ServicesDefault': {'type': list},
'tags': {'type': list},
'description': {'type': str},
'networks': {'type': list},
}
if 'name' not in role:
raise RoleMetadataError('Role name is missing from the role')
# validate numeric metadata is numeric
for k in schema:
if k in role and not isinstance(role[k], schema[k]['type']):
msg = "Role '{}': {} is not of expected type {}".format(
role['name'], k, schema[k]['type'])
raise RoleMetadataError(msg)
return role