Files
tripleo-common/tripleo_common/utils/validations.py
Ana Krivokapic 8f88e78778 Add support for custom validations
This patch introduces support for running custom validations by changing
the behavior of the validations actions ListValidationsAction,
ListGroupsAction and RunValidationAction.

Until now, these actions sourced validations from a directory on disk.
Now, these action are sourcing validations from the plan container
subdirectory (custom validations), or, if this is not available, from
the Swift container holding the default validations.

Change-Id: I9e9131b355312c53f12d154976d5d9cd706cc338
Implements: blueprint custom-validations
Depends-On: I338e139fa770ebb7bdcc1c0afb79eec062fada8b
2018-08-10 11:21:29 +02:00

179 lines
5.7 KiB
Python

# Copyright 2016 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import re
import tempfile
import yaml
from oslo_concurrency import processutils
from swiftclient import exceptions as swiftexceptions
from tripleo_common import constants
import tripleo_common.utils.swift as swift_utils
LOG = logging.getLogger(__name__)
DEFAULT_METADATA = {
'name': 'Unnamed',
'description': 'No description',
'stage': 'No stage',
'groups': [],
}
def get_validation_metadata(validation, key):
try:
return validation[0]['vars']['metadata'][key]
except KeyError:
return DEFAULT_METADATA.get(key)
except TypeError:
LOG.exception("Failed to get validation metadata.")
def _get_validations_from_swift(swift, container, objects, groups, results,
skip_existing=False):
existing_ids = [validation['id'] for validation in results]
for obj in objects:
validation_id, ext = os.path.splitext(obj['name'])
if ext != '.yaml':
continue
if skip_existing and validation_id in existing_ids:
continue
contents = swift.get_object(container, obj['name'])[1]
validation = yaml.safe_load(contents)
validation_groups = get_validation_metadata(validation, 'groups') or []
if not groups or set.intersection(set(groups), set(validation_groups)):
results.append({
'id': validation_id,
'name': get_validation_metadata(validation, 'name'),
'groups': get_validation_metadata(validation, 'groups'),
'description': get_validation_metadata(validation,
'description'),
'metadata': get_remaining_metadata(validation)
})
return results
def load_validations(swift, plan, groups=None):
"""Loads all validations.
Retrieves all of default and custom validations for a given plan and
returns a list of dicts, with each dict representing a single validation.
If both a default and a custom validation with the same name are found,
the custom validation is picked.
"""
results = []
# Get custom validations first
container = plan
try:
objects = swift.get_container(
container, prefix=constants.CUSTOM_VALIDATIONS_FOLDER)[1]
except swiftexceptions.ClientException:
pass
else:
results = _get_validations_from_swift(
swift, container, objects, groups, results)
# Get default validations
container = constants.VALIDATIONS_CONTAINER_NAME
objects = swift.get_container(container)[1]
results = _get_validations_from_swift(swift, container, objects, groups,
results, skip_existing=True)
return results
def get_remaining_metadata(validation):
try:
return {k: v for k, v in validation[0]['vars']['metadata'].items()
if k not in ['name', 'description', 'groups']}
except KeyError:
return dict()
def download_validation(swift, plan, validation):
"""Downloads validations from Swift to a temporary location"""
dst_dir = '/tmp/{}-validations'.format(plan)
# Download the whole default validations container
swift_utils.download_container(
swift,
constants.VALIDATIONS_CONTAINER_NAME,
dst_dir,
overwrite_only_newer=True
)
filename = '{}.yaml'.format(validation)
swift_path = os.path.join(constants.CUSTOM_VALIDATIONS_FOLDER, filename)
dst_path = os.path.join(dst_dir, filename)
# If a custom validation with that name exists, get it from the plan
# container and override. Otherwise, the default one will be used.
try:
contents = swift.get_object(plan, swift_path)[1]
except swiftexceptions.ClientException:
pass
else:
with open(dst_path, 'w') as f:
f.write(contents)
return dst_path
def run_validation(swift, validation, identity_file, plan, context):
return processutils.execute(
'/usr/bin/sudo', '-u', 'validations',
'OS_AUTH_URL={}'.format(context.auth_uri),
'OS_USERNAME={}'.format(context.user_name),
'OS_AUTH_TOKEN={}'.format(context.auth_token),
'OS_TENANT_NAME={}'.format(context.project_name),
'/usr/bin/run-validation',
download_validation(swift, plan, validation),
identity_file,
plan
)
def write_identity_file(key):
"""Write the SSH private key to disk"""
fd, path = tempfile.mkstemp(prefix='validations_identity_')
LOG.debug('Writing SSH key to disk at %s', path)
with os.fdopen(fd, 'w') as tmp:
tmp.write(key)
processutils.execute('/usr/bin/sudo', '/usr/bin/chown', '-h',
'validations:', path)
return path
def cleanup_identity_file(path):
"""Remove the SSH private key from disk"""
LOG.debug('Cleaning up identity file at %s', path)
processutils.execute('/usr/bin/sudo', '/usr/bin/rm', '-f', path)
def pattern_validator(pattern, value):
LOG.debug('Validating %s with pattern %s', value, pattern)
if not re.match(pattern, value):
return False
return True