#!/usr/bin/env python # Copyright 2014 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import argparse import ast import importlib import inspect import os import sys import unittest import uuid from oslo_utils import uuidutils import six.moves.urllib.parse as urlparse DECORATOR_MODULE = 'decorators' DECORATOR_NAME = 'idempotent_id' DECORATOR_IMPORT = 'tempest.%s' % DECORATOR_MODULE IMPORT_LINE = 'from tempest.lib import %s' % DECORATOR_MODULE DECORATOR_TEMPLATE = "@%s.%s('%%s')" % (DECORATOR_MODULE, DECORATOR_NAME) UNIT_TESTS_EXCLUDE = 'tempest.tests' class SourcePatcher(object): """"Lazy patcher for python source files""" def __init__(self): self.source_files = None self.patches = None self.clear() def clear(self): """Clear inner state""" self.source_files = {} self.patches = {} @staticmethod def _quote(s): return urlparse.quote(s) @staticmethod def _unquote(s): return urlparse.unquote(s) def add_patch(self, filename, patch, line_no): """Add lazy patch""" if filename not in self.source_files: with open(filename) as f: self.source_files[filename] = self._quote(f.read()) patch_id = uuidutils.generate_uuid() if not patch.endswith('\n'): patch += '\n' self.patches[patch_id] = self._quote(patch) lines = self.source_files[filename].split(self._quote('\n')) lines[line_no - 1] = ''.join(('{%s:s}' % patch_id, lines[line_no - 1])) self.source_files[filename] = self._quote('\n').join(lines) @staticmethod def _save_changes(filename, source): print('%s fixed' % filename) with open(filename, 'w') as f: f.write(source) def apply_patches(self): """Apply all patches""" for filename in self.source_files: patched_source = self._unquote( self.source_files[filename].format(**self.patches) ) self._save_changes(filename, patched_source) self.clear() class TestChecker(object): def __init__(self, package): self.package = package self.base_path = os.path.abspath(os.path.dirname(package.__file__)) def _path_to_package(self, path): relative_path = path[len(self.base_path) + 1:] if relative_path: return '.'.join((self.package.__name__,) + tuple(relative_path.split('/'))) else: return self.package.__name__ def _modules_search(self): """Recursive search for python modules in base package""" modules = [] for root, _, files in os.walk(self.base_path): if not os.path.exists(os.path.join(root, '__init__.py')): continue root_package = self._path_to_package(root) for item in files: if item.endswith('.py'): module_name = '.'.join((root_package, os.path.splitext(item)[0])) if not module_name.startswith(UNIT_TESTS_EXCLUDE): modules.append(module_name) return modules @staticmethod def _get_idempotent_id(test_node): "Return key-value dict with metadata from @decorators.idempotent_id" idempotent_id = None for decorator in test_node.decorator_list: if (hasattr(decorator, 'func') and hasattr(decorator.func, 'attr') and decorator.func.attr == DECORATOR_NAME and hasattr(decorator.func, 'value') and decorator.func.value.id == DECORATOR_MODULE): for arg in decorator.args: idempotent_id = ast.literal_eval(arg) return idempotent_id @staticmethod def _is_decorator(line): return line.strip().startswith('@') @staticmethod def _is_def(line): return line.strip().startswith('def ') def _add_uuid_to_test(self, patcher, test_node, source_path): with open(source_path) as src: src_lines = src.read().split('\n') lineno = test_node.lineno insert_position = lineno while True: if (self._is_def(src_lines[lineno - 1]) or (self._is_decorator(src_lines[lineno - 1]) and (DECORATOR_TEMPLATE.split('(')[0] <= src_lines[lineno - 1].strip().split('(')[0]))): insert_position = lineno break lineno += 1 patcher.add_patch( source_path, ' ' * test_node.col_offset + DECORATOR_TEMPLATE % uuid.uuid4(), insert_position ) @staticmethod def _is_test_case(module, node): if (node.__class__ is ast.ClassDef and hasattr(module, node.name) and inspect.isclass(getattr(module, node.name))): return issubclass(getattr(module, node.name), unittest.TestCase) @staticmethod def _is_test_method(node): return (node.__class__ is ast.FunctionDef and node.name.startswith('test_')) @staticmethod def _next_node(body, node): if body.index(node) < len(body): return body[body.index(node) + 1] @staticmethod def _import_name(node): if isinstance(node, ast.Import): return node.names[0].name elif isinstance(node, ast.ImportFrom): return '%s.%s' % (node.module, node.names[0].name) def _add_import_for_test_uuid(self, patcher, src_parsed, source_path): with open(source_path) as f: src_lines = f.read().split('\n') line_no = 0 tempest_imports = [node for node in src_parsed.body if self._import_name(node) and 'tempest.' in self._import_name(node)] if not tempest_imports: import_snippet = '\n'.join(('', IMPORT_LINE, '')) else: for node in tempest_imports: if self._import_name(node) < DECORATOR_IMPORT: continue else: line_no = node.lineno import_snippet = IMPORT_LINE break else: line_no = tempest_imports[-1].lineno while True: if (not src_lines[line_no - 1] or getattr(self._next_node(src_parsed.body, tempest_imports[-1]), 'lineno') == line_no or line_no == len(src_lines)): break line_no += 1 import_snippet = '\n'.join((IMPORT_LINE, '')) patcher.add_patch(source_path, import_snippet, line_no) def get_tests(self): """Get test methods with sources from base package with metadata""" tests = {} for module_name in self._modules_search(): tests[module_name] = {} module = importlib.import_module(module_name) source_path = '.'.join( (os.path.splitext(module.__file__)[0], 'py') ) with open(source_path, 'r') as f: source = f.read() tests[module_name]['source_path'] = source_path tests[module_name]['tests'] = {} source_parsed = ast.parse(source) tests[module_name]['ast'] = source_parsed tests[module_name]['import_valid'] = ( hasattr(module, DECORATOR_MODULE) and inspect.ismodule(getattr(module, DECORATOR_MODULE)) ) test_cases = (node for node in source_parsed.body if self._is_test_case(module, node)) for node in test_cases: for subnode in filter(self._is_test_method, node.body): test_name = '%s.%s' % (node.name, subnode.name) tests[module_name]['tests'][test_name] = subnode return tests @staticmethod def _filter_tests(function, tests): """Filter tests with condition 'function(test_node) == True'""" result = {} for module_name in tests: for test_name in tests[module_name]['tests']: if function(module_name, test_name, tests): if module_name not in result: result[module_name] = { 'ast': tests[module_name]['ast'], 'source_path': tests[module_name]['source_path'], 'import_valid': tests[module_name]['import_valid'], 'tests': {} } result[module_name]['tests'][test_name] = \ tests[module_name]['tests'][test_name] return result def find_untagged(self, tests): """Filter all tests without uuid in metadata""" def check_uuid_in_meta(module_name, test_name, tests): idempotent_id = self._get_idempotent_id( tests[module_name]['tests'][test_name]) return not idempotent_id return self._filter_tests(check_uuid_in_meta, tests) def report_collisions(self, tests): """Reports collisions if there are any Returns true if collisions exist. """ uuids = {} def report(module_name, test_name, tests): test_uuid = self._get_idempotent_id( tests[module_name]['tests'][test_name]) if not test_uuid: return if test_uuid in uuids: error_str = "%s:%s\n uuid %s collision: %s<->%s\n%s:%s" % ( tests[module_name]['source_path'], tests[module_name]['tests'][test_name].lineno, test_uuid, test_name, uuids[test_uuid]['test_name'], uuids[test_uuid]['source_path'], uuids[test_uuid]['test_node'].lineno, ) print(error_str) print("cannot automatically resolve the collision, please " "manually remove the duplicate value on the new test.") return True else: uuids[test_uuid] = { 'module': module_name, 'test_name': test_name, 'test_node': tests[module_name]['tests'][test_name], 'source_path': tests[module_name]['source_path'] } return bool(self._filter_tests(report, tests)) def report_untagged(self, tests): """Reports untagged tests if there are any Returns true if untagged tests exist. """ def report(module_name, test_name, tests): error_str = ("%s:%s\nmissing @decorators.idempotent_id" "('...')\n%s\n") % ( tests[module_name]['source_path'], tests[module_name]['tests'][test_name].lineno, test_name ) print(error_str) return True return bool(self._filter_tests(report, tests)) def fix_tests(self, tests): """Add uuids to all specified in tests and fix it in source files""" patcher = SourcePatcher() for module_name in tests: add_import_once = True for test_name in tests[module_name]['tests']: if not tests[module_name]['import_valid'] and add_import_once: self._add_import_for_test_uuid( patcher, tests[module_name]['ast'], tests[module_name]['source_path'] ) add_import_once = False self._add_uuid_to_test( patcher, tests[module_name]['tests'][test_name], tests[module_name]['source_path']) patcher.apply_patches() def run(): parser = argparse.ArgumentParser() parser.add_argument('--package', action='store', dest='package', default='tempest', type=str, help='Package with tests') parser.add_argument('--fix', action='store_true', dest='fix_tests', help='Attempt to fix tests without UUIDs') args = parser.parse_args() sys.path.append(os.path.join(os.path.dirname(__file__), '..')) pkg = importlib.import_module(args.package) checker = TestChecker(pkg) errors = False tests = checker.get_tests() untagged = checker.find_untagged(tests) errors = checker.report_collisions(tests) or errors if args.fix_tests and untagged: checker.fix_tests(untagged) else: errors = checker.report_untagged(untagged) or errors if errors: sys.exit("@decorators.idempotent_id existence and uniqueness checks " "failed\n" "Run 'tox -v -e uuidgen' to automatically fix tests with\n" "missing @decorators.idempotent_id decorators.") if __name__ == '__main__': run()