Merge "Adding class filter and fixing the loss of module path name. Allow for better filtering."

This commit is contained in:
Jenkins
2014-05-21 22:08:33 +00:00
committed by Gerrit Code Review

View File

@@ -16,25 +16,24 @@ limitations under the License.
"""
import argparse
import imp
import os
import pkgutil
import sys
import time
from fnmatch import fnmatch
from inspect import getmembers, isclass
from multiprocessing import Process, Manager
from re import search
from traceback import extract_tb
import unittest
import uuid
from importlib import import_module
from inspect import isclass
from multiprocessing import Process, Manager
from re import search
from traceback import print_exc
from cafe.drivers.unittest.fixtures import BaseTestFixture
from cafe.common.reporting.cclogging import log_results
from cafe.drivers.unittest.parsers import SummarizeResults
from cafe.drivers.unittest.decorators import (
TAGS_DECORATOR_TAG_LIST_NAME, TAGS_DECORATOR_ATTR_DICT_NAME)
from cafe.common.reporting.reporter import Reporter
from cafe.configurator.managers import TestEnvManager
from cafe.drivers.unittest.decorators import (
TAGS_DECORATOR_TAG_LIST_NAME, TAGS_DECORATOR_ATTR_DICT_NAME)
from cafe.drivers.unittest.parsers import SummarizeResults
from cafe.drivers.unittest.suite import OpenCafeUnittestTestSuite
@@ -72,29 +71,6 @@ def tree(directory, padding, print_files=False):
print "{0}{1}".format(padding, file_name)
def error_msg(e_type, e_msg):
"""
creates an error message
"""
err_msg = "<[WARNING {0} ERROR {1}]>".format(str(e_type), str(e_msg))
return err_msg
def print_traceback():
"""
formats and prints out a minimal stack trace
"""
info = sys.exc_info()
excp_type, excp_value = info[:2]
err_msg = error_msg(excp_type.__name__, excp_value)
print err_msg
for file_name, lineno, function, text in extract_tb(info[2]):
print ">>>", file_name
print " > line", lineno, "in", function, repr(text)
print "-" * 100
class _WritelnDecorator(object):
"""Used to decorate file-like objects with a handy "writeln" method"""
@@ -129,162 +105,16 @@ class OpenCafeParallelTextTestRunner(unittest.TextTestRunner):
return result
class LoadedTestClass(object):
def __init__(self, loaded_module):
self.module = loaded_module
self.module_path = self._get_module_path(loaded_module)
self.module_name = self._get_module_name(loaded_module)
def _get_class_names(self, loaded_module):
"""
gets all the class names in an imported module
"""
for _, obj in getmembers(loaded_module, isclass):
temp_obj = obj
try:
while temp_obj.__base__ != object:
if (temp_obj.__base__ == unittest.TestCase
or temp_obj.__base__ == BaseTestFixture
and temp_obj != obj.__base__):
if (not search("fixture", obj.__name__.lower())
and obj.__test__):
yield obj.__name__
break
else:
temp_obj = temp_obj.__base__
except AttributeError:
continue
def _get_class(self, loaded_module, test_class_name):
class_ = None
try:
class_ = getattr(loaded_module, test_class_name)
except AttributeError, e:
print e
return None
return class_
def get_instances(self):
loaded = []
for class_name in self._get_class_names(self.module):
if class_name not in loaded:
loaded.append(class_name)
yield self._get_class(self.module, class_name)
def _get_module_path(self, loaded_module):
return os.path.dirname(loaded_module.__file__)
def _get_module_name(self, loaded_module):
return loaded_module.__name__.split(".")[1]
class SuiteBuilder(object):
def __init__(self, module_regex, method_regex, cl_tags, supress_flag):
self.module_regex = module_regex
self.method_regex = method_regex
self.tags = cl_tags
self.supress = supress_flag
def get_dotted_path(self, path, split_token):
"""
creates a dotted path for use by unittest"s loader
"""
try:
position = len(path.split(split_token)) - 1
temp_path = "{0}{1}".format(
split_token, path.split(split_token)[position])
split_path = os.path.split(temp_path)
dotted_path = ".".join(split_path)
except AttributeError:
return None
except Exception:
return None
return dotted_path
def find_root(self, path, target):
"""
walks the path searching for the target root folder.
"""
for root, _, _ in os.walk(path):
if target in os.path.basename(root):
return root
def find_file(self, path, target):
"""
walks the path searching for the target file. the full to the target
file is returned
"""
for root, _, files in os.walk(path):
for file_name in files:
if (target in file_name and not file_name.endswith(".pyc")):
return os.path.join(root, file_name)
def find_subdir(self, path, target):
"""
Walks the path searching for the target subdirectory.
The full path to the target subdirectory is returned
"""
for root, dirs, _ in os.walk(path):
for dir_name in dirs:
if target in dir_name:
return os.path.join(root, dir_name)
def drill_path(self, path, target):
"""
walks the path searching for the last instance of the target path.
"""
return_path = {}
for root, _, _ in os.walk(path):
if target in os.path.basename(root):
return_path[target] = root
return return_path
def load_module(self, module_path):
"""
uses imp to load a module
"""
loaded_module = None
module_name = os.path.basename(module_path)
package_path = os.path.dirname(module_path)
pkg_name = os.path.basename(package_path)
root_path = os.path.dirname(package_path)
module_name = module_name.split('.')[0]
f, p_path, description = imp.find_module(pkg_name, [root_path])
loaded_pkg = imp.load_module(pkg_name, f, p_path, description)
f, m_path, description = imp.find_module(
module_name, loaded_pkg.__path__)
try:
mod = "{0}.{1}".format(loaded_pkg.__name__, module_name)
loaded_module = imp.load_module(mod, f, m_path, description)
except ImportError:
raise
return loaded_module
def get_modules(self, rootdir):
"""
generator yields modules matching the module_regex
"""
for root, _, files in os.walk(rootdir):
for name in files:
if (fnmatch(name, self.module_regex)
and name != "__init__.py"
and not name.endswith(".pyc")):
file_name = name.split(".")[0]
full_path = "{0}/{1}".format(root, file_name)
yield full_path
def __init__(self, cl_args, test_repo_name):
self.packages = cl_args.packages or []
self.module_regex = cl_args.module_regex
self.method_regex = cl_args.method_regex
self.tags = cl_args.tags
self.supress = cl_args.supress_flag
self.product = cl_args.product
self.test_repo_name = test_repo_name
def _parse_tags(self, tags):
"""
@@ -345,6 +175,9 @@ class SuiteBuilder(object):
return match == set(tags) if token == "+" else bool(match)
def _check_method(self, class_, method_name, tags, attrs, token):
"""
checks tags on methods
"""
load_test_flag = False
attr_flag = False
tag_flag = False
@@ -368,7 +201,20 @@ class SuiteBuilder(object):
return load_test_flag
def build_suite(self, loaded_module):
def get_classes(self, loaded_module):
"""
finds all the classes in a loaded module calculates full path to class
"""
classes = []
for objname in dir(loaded_module):
obj = getattr(loaded_module, objname, None)
if (isclass(obj) and issubclass(obj, unittest.TestCase) and
"fixture" not in obj.__name__.lower() and
getattr(obj, "__test__", True)):
classes.append(obj)
return classes
def build_suite(self, module_path):
"""
loads the found tests and builds the test suite
"""
@@ -376,64 +222,92 @@ class SuiteBuilder(object):
attrs = {}
loader = unittest.TestLoader()
suite = OpenCafeUnittestTestSuite()
loaded = LoadedTestClass(loaded_module)
try:
loaded_module = import_module(module_path)
except Exception as e:
sys.stderr.write("{0}\n".format("=" * 70))
sys.stderr.write(
"Failed to load module '{0}'\n".format(module_path, e))
sys.stderr.write("{0}\n".format("-" * 70))
print_exc(file=sys.stderr)
sys.stderr.write("{0}\n".format("-" * 70))
return
if self.tags:
tag_list, attrs, token = self._parse_tags(self.tags)
if (hasattr(loaded_module, "load_tests")
and not self.supress
and self.method_regex == "test_*"
and not self.method_regex
and self.tags is None):
load_tests = getattr(loaded_module, "load_tests")
suite.addTests(load_tests(loader, None, None))
return suite
for class_ in loaded.get_instances():
for class_ in self.get_classes(loaded_module):
for method_name in dir(class_):
load_test_flag = False
if fnmatch(method_name, self.method_regex):
if not self.tags:
load_test_flag = True
else:
load_test_flag = self._check_method(
class_, method_name, tag_list, attrs, token)
if load_test_flag:
suite.addTest(class_(method_name))
if (method_name.startswith("test_") and
search(self.method_regex, method_name) and
(not self.tags or self._check_method(
class_, method_name, tag_list, attrs, token))):
suite.addTest(class_(method_name))
return suite
def get_tests(self, module_path):
suite = OpenCafeUnittestTestSuite()
def get_modules(self):
"""
walks all modules in the test repo, filters by
product and module filter. Filter passed in with -m
returns a list of module dotpath strings
"""
test_repo = import_module(self.test_repo_name)
prefix = "{0}.".format(test_repo.__name__)
product_path = "{0}{1}".format(prefix, self.product)
modnames = []
for importer, modname, is_pkg in pkgutil.walk_packages(
path=test_repo.__path__, prefix=prefix,
onerror=lambda x: None):
if not is_pkg and modname.startswith(product_path):
if (not self.module_regex or
self.module_regex in modname.rsplit(".", 1)[1]):
modnames.append(modname)
try:
loaded_module = self.load_module(module_path)
except ImportError:
print_traceback()
return None
filter_mods = []
for modname in modnames:
add_package = not bool(self.packages)
for package in self.packages:
if package in modname.rsplit(".", 1)[0]:
add_package = True
break
if add_package:
filter_mods.append(modname)
filter_mods.sort()
return filter_mods
try:
suite = self.build_suite(loaded_module)
except Exception:
print_traceback()
return None
def generate_suite(self):
"""
creates a single unittest test suite
"""
master_suite = OpenCafeUnittestTestSuite()
modules = self.get_modules()
return suite
def generate_suite(self, path, suite=None):
master_suite = suite or OpenCafeUnittestTestSuite()
for module_path in self.get_modules(path):
st = self.get_tests(module_path)
if st:
master_suite.addTests(st)
for modname in modules:
suite = self.build_suite(modname)
if suite:
master_suite.addTests(suite)
return master_suite
def generate_suite_list(self, path, suite_list=None):
master_suite_list = suite_list or []
for module_path in self.get_modules(path):
st = self.get_tests(module_path)
if st:
master_suite_list.append(st)
def generate_suite_list(self):
"""
creates a list containing unittest test suites
"""
master_suite_list = []
modules = self.get_modules()
for modname in modules:
suite = self.build_suite(modname)
if suite and len(suite._tests):
master_suite_list.append(suite)
return master_suite_list
@@ -516,25 +390,6 @@ class _UnittestRunnerCLI(object):
setattr(namespace, self.dest, values)
class ModuleRegexAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
# Make sure user-specified module name has a .py at the end of it
if ".py" not in str(values):
values = "{0}{1}".format(values, ".py")
setattr(namespace, self.dest, values)
class MethodRegexAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
# Make sure user-specified method name has test_ at the start of it
if 'test_' not in str(values):
values = "{0}{1}".format("test_", values)
setattr(namespace, self.dest, values)
class DataAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
@@ -675,7 +530,7 @@ class _UnittestRunnerCLI(object):
""".format(
baseargs=base, package=pkg, module=mod, method=mthd, tags=testtag)
desc = "Cloud Common Automated Engine Framework"
desc = "Open Common Automation Framework Engine"
argparser = argparse.ArgumentParser(
usage=usage_string, description=desc)
@@ -726,25 +581,22 @@ class _UnittestRunnerCLI(object):
argparser.add_argument(
"-p", "--packages",
nargs="*",
nargs="+",
default=None,
metavar="[package(s)]",
help="test package(s) in the product's "
"test repo")
metavar="<string filter>",
help="Package Filter")
argparser.add_argument(
"-m", "--module-regex", "--module",
action=self.ModuleRegexAction,
default="*.py",
metavar="<module>",
help="test module regex - defaults to '*.py'")
default="",
metavar="<string filter>",
help="Test module filter")
argparser.add_argument(
"-M", "--method-regex",
action=self.MethodRegexAction,
default="test_*",
metavar="<method>",
help="test method regex defaults to 'test_'")
default="",
metavar="<string filter>",
help="Test method filter. defaults to 'test_'")
argparser.add_argument(
"-t", "--tags",
@@ -767,6 +619,11 @@ class _UnittestRunnerCLI(object):
action="store_true",
default=False)
argparser.add_argument(
"--dry-run",
action="store_true",
default=False)
argparser.add_argument(
"--data-directory",
action=self.DataDirectoryAction,
@@ -815,9 +672,10 @@ class UnittestRunner(object):
# finalize() is called
self.test_env.test_data_directory = (
self.test_env.test_data_directory or self.cl_args.data_directory)
self.product_repo_path = os.path.join(
self.test_env.test_repo_path, self.cl_args.product)
self.test_env.finalize()
self.product = self.cl_args.product
self.test_repo = (
self.test_env.engine_config_interface.default_test_repo)
self.print_mug_and_paths(self.test_env)
@staticmethod
@@ -848,17 +706,17 @@ class UnittestRunner(object):
results.update({test_id: result})
@staticmethod
def get_runner(parallel, fail_fast, verbosity):
def get_runner(cl_args):
test_runner = None
# Use the parallel text runner so the console logs look correct
if parallel:
if cl_args.parallel:
test_runner = OpenCafeParallelTextTestRunner(
verbosity=int(verbosity))
verbosity=cl_args.verbose)
else:
test_runner = unittest.TextTestRunner(verbosity=int(verbosity))
test_runner = unittest.TextTestRunner(verbosity=cl_args.verbose)
test_runner.failfast = fail_fast
test_runner.failfast = cl_args.fail_fast
return test_runner
@staticmethod
@@ -892,39 +750,27 @@ class UnittestRunner(object):
master_suite = OpenCafeUnittestTestSuite()
parallel_test_list = []
builder = SuiteBuilder(
self.cl_args.module_regex, self.cl_args.method_regex,
self.cl_args.tags, self.cl_args.supress_flag)
test_runner = self.get_runner(
self.cl_args.parallel, self.cl_args.fail_fast,
self.cl_args.verbose)
# Build master test suite
if self.cl_args.packages:
for package_name in self.cl_args.packages:
path = builder.find_subdir(
self.product_repo_path, package_name)
if path is None:
print error_msg("PACKAGE", package_name)
continue
master_suite = builder.generate_suite(path, master_suite)
if self.cl_args.parallel:
parallel_test_list = builder.generate_suite_list(
path, parallel_test_list)
else:
master_suite = builder.generate_suite(self.product_repo_path)
if self.cl_args.parallel:
parallel_test_list = builder.generate_suite_list(
self.product_repo_path)
builder = SuiteBuilder(self.cl_args, self.test_repo)
test_runner = self.get_runner(self.cl_args)
if self.cl_args.parallel:
parallel_test_list = builder.generate_suite_list()
if self.cl_args.dry_run:
for suite in parallel_test_list:
for test in suite:
print test
exit(0)
exit_code = self.run_parallel(
parallel_test_list, test_runner,
result_type=self.cl_args.result,
results_path=self.cl_args.result_directory)
exit(exit_code)
else:
master_suite = builder.generate_suite()
if self.cl_args.dry_run:
for test in master_suite:
print test
exit(0)
exit_code = self.run_serialized(
master_suite, test_runner, result_type=self.cl_args.result,
results_path=self.cl_args.result_directory)