diff --git a/tempest/test.py b/tempest/test.py deleted file mode 100644 index 68806c2..0000000 --- a/tempest/test.py +++ /dev/null @@ -1,618 +0,0 @@ -# Copyright 2012 OpenStack Foundation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import atexit -import functools -import json -import os -import re -import sys -import time -import urllib -import uuid - -import fixtures -import testresources -import testscenarios -import testtools - -from tempest import clients -import tempest.common.generator.valid_generator as valid -from tempest.common import isolated_creds -from tempest import config -from tempest import exceptions -from tempest.openstack.common import importutils -from tempest.openstack.common import log as logging - -LOG = logging.getLogger(__name__) - -CONF = config.CONF - -# All the successful HTTP status codes from RFC 2616 -HTTP_SUCCESS = (200, 201, 202, 203, 204, 205, 206) - - -def attr(*args, **kwargs): - """A decorator which applies the testtools attr decorator - - This decorator applies the testtools.testcase.attr if it is in the list of - attributes to testtools we want to apply. - """ - - def decorator(f): - if 'type' in kwargs and isinstance(kwargs['type'], str): - f = testtools.testcase.attr(kwargs['type'])(f) - if kwargs['type'] == 'smoke': - f = testtools.testcase.attr('gate')(f) - elif 'type' in kwargs and isinstance(kwargs['type'], list): - for attr in kwargs['type']: - f = testtools.testcase.attr(attr)(f) - if attr == 'smoke': - f = testtools.testcase.attr('gate')(f) - return f - - return decorator - - -def safe_setup(f): - """A decorator used to wrap the setUpClass for cleaning up resources - when setUpClass failed. - """ - - def decorator(cls): - try: - f(cls) - except Exception as se: - etype, value, trace = sys.exc_info() - LOG.exception("setUpClass failed: %s" % se) - try: - cls.tearDownClass() - except Exception as te: - LOG.exception("tearDownClass failed: %s" % te) - try: - raise etype(value), None, trace - finally: - del trace # for avoiding circular refs - - return decorator - - -def services(*args, **kwargs): - """A decorator used to set an attr for each service used in a test case - - This decorator applies a testtools attr for each service that gets - exercised by a test case. - """ - service_list = { - 'compute': CONF.service_available.nova, - 'image': CONF.service_available.glance, - 'baremetal': CONF.service_available.ironic, - 'volume': CONF.service_available.cinder, - 'orchestration': CONF.service_available.heat, - # NOTE(mtreinish) nova-network will provide networking functionality - # if neutron isn't available, so always set to True. - 'network': True, - 'identity': True, - 'object_storage': CONF.service_available.swift, - 'dashboard': CONF.service_available.horizon, - 'telemetry': CONF.service_available.ceilometer, - 'data_processing': CONF.service_available.sahara - } - - def decorator(f): - for service in args: - if service not in service_list: - raise exceptions.InvalidServiceTag('%s is not a valid service' - % service) - attr(type=list(args))(f) - - @functools.wraps(f) - def wrapper(self, *func_args, **func_kwargs): - for service in args: - if not service_list[service]: - msg = 'Skipped because the %s service is not available' % ( - service) - raise testtools.TestCase.skipException(msg) - return f(self, *func_args, **func_kwargs) - return wrapper - return decorator - - -def stresstest(*args, **kwargs): - """Add stress test decorator - - For all functions with this decorator a attr stress will be - set automatically. - - @param class_setup_per: allowed values are application, process, action - ``application``: once in the stress job lifetime - ``process``: once in the worker process lifetime - ``action``: on each action - @param allow_inheritance: allows inheritance of this attribute - """ - def decorator(f): - if 'class_setup_per' in kwargs: - setattr(f, "st_class_setup_per", kwargs['class_setup_per']) - else: - setattr(f, "st_class_setup_per", 'process') - if 'allow_inheritance' in kwargs: - setattr(f, "st_allow_inheritance", kwargs['allow_inheritance']) - else: - setattr(f, "st_allow_inheritance", False) - attr(type='stress')(f) - return f - return decorator - - -def skip_because(*args, **kwargs): - """A decorator useful to skip tests hitting known bugs - - @param bug: bug number causing the test to skip - @param condition: optional condition to be True for the skip to have place - @param interface: skip the test if it is the same as self._interface - """ - def decorator(f): - @functools.wraps(f) - def wrapper(self, *func_args, **func_kwargs): - skip = False - if "condition" in kwargs: - if kwargs["condition"] is True: - skip = True - elif "interface" in kwargs: - if kwargs["interface"] == self._interface: - skip = True - else: - skip = True - if "bug" in kwargs and skip is True: - if not kwargs['bug'].isdigit(): - raise ValueError('bug must be a valid bug number') - msg = "Skipped until Bug: %s is resolved." % kwargs["bug"] - raise testtools.TestCase.skipException(msg) - return f(self, *func_args, **func_kwargs) - return wrapper - return decorator - - -def requires_ext(*args, **kwargs): - """A decorator to skip tests if an extension is not enabled - - @param extension - @param service - """ - def decorator(func): - @functools.wraps(func) - def wrapper(*func_args, **func_kwargs): - if not is_extension_enabled(kwargs['extension'], - kwargs['service']): - msg = "Skipped because %s extension: %s is not enabled" % ( - kwargs['service'], kwargs['extension']) - raise testtools.TestCase.skipException(msg) - return func(*func_args, **func_kwargs) - return wrapper - return decorator - - -def is_extension_enabled(extension_name, service): - """A function that will check the list of enabled extensions from config - - """ - config_dict = { - 'compute': CONF.compute_feature_enabled.api_extensions, - 'compute_v3': CONF.compute_feature_enabled.api_v3_extensions, - 'volume': CONF.volume_feature_enabled.api_extensions, - 'network': CONF.network_feature_enabled.api_extensions, - 'object': CONF.object_storage_feature_enabled.discoverable_apis, - } - if len(config_dict[service]) == 0: - return False - if config_dict[service][0] == 'all': - return True - if extension_name in config_dict[service]: - return True - return False - - -at_exit_set = set() - - -def validate_tearDownClass(): - if at_exit_set: - LOG.error( - "tearDownClass does not call the super's " - "tearDownClass in these classes: \n" - + str(at_exit_set)) - - -atexit.register(validate_tearDownClass) - -if sys.version_info >= (2, 7): - class BaseDeps(testtools.TestCase, - testtools.testcase.WithAttributes, - testresources.ResourcedTestCase): - pass -else: - # Define asserts for py26 - import unittest2 - - class BaseDeps(testtools.TestCase, - testtools.testcase.WithAttributes, - testresources.ResourcedTestCase, - unittest2.TestCase): - pass - - -class BaseTestCase(BaseDeps): - - setUpClassCalled = False - _service = None - - network_resources = {} - - # NOTE(sdague): log_format is defined inline here instead of using the oslo - # default because going through the config path recouples config to the - # stress tests too early, and depending on testr order will fail unit tests - log_format = ('%(asctime)s %(process)d %(levelname)-8s ' - '[%(name)s] %(message)s') - - @classmethod - def setUpClass(cls): - if hasattr(super(BaseTestCase, cls), 'setUpClass'): - super(BaseTestCase, cls).setUpClass() - cls.setUpClassCalled = True - - @classmethod - def tearDownClass(cls): - at_exit_set.discard(cls) - if hasattr(super(BaseTestCase, cls), 'tearDownClass'): - super(BaseTestCase, cls).tearDownClass() - - def setUp(self): - super(BaseTestCase, self).setUp() - if not self.setUpClassCalled: - raise RuntimeError("setUpClass does not calls the super's" - "setUpClass in the " - + self.__class__.__name__) - at_exit_set.add(self.__class__) - test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0) - try: - test_timeout = int(test_timeout) - except ValueError: - test_timeout = 0 - if test_timeout > 0: - self.useFixture(fixtures.Timeout(test_timeout, gentle=True)) - - if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or - os.environ.get('OS_STDOUT_CAPTURE') == '1'): - stdout = self.useFixture(fixtures.StringStream('stdout')).stream - self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout)) - if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or - os.environ.get('OS_STDERR_CAPTURE') == '1'): - stderr = self.useFixture(fixtures.StringStream('stderr')).stream - self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr)) - if (os.environ.get('OS_LOG_CAPTURE') != 'False' and - os.environ.get('OS_LOG_CAPTURE') != '0'): - self.useFixture(fixtures.LoggerFixture(nuke_handlers=False, - format=self.log_format, - level=None)) - - @classmethod - def get_client_manager(cls, interface=None): - """ - Returns an OpenStack client manager - """ - cls.isolated_creds = isolated_creds.IsolatedCreds( - cls.__name__, network_resources=cls.network_resources) - - force_tenant_isolation = getattr(cls, 'force_tenant_isolation', None) - if CONF.compute.allow_tenant_isolation or force_tenant_isolation: - creds = cls.isolated_creds.get_primary_creds() - if getattr(cls, '_interface', None): - os = clients.Manager(credentials=creds, - interface=cls._interface, - service=cls._service) - elif interface: - os = clients.Manager(credentials=creds, - interface=interface, - service=cls._service) - else: - os = clients.Manager(credentials=creds, - service=cls._service) - else: - if getattr(cls, '_interface', None): - os = clients.Manager(interface=cls._interface, - service=cls._service) - elif interface: - os = clients.Manager(interface=interface, service=cls._service) - else: - os = clients.Manager(service=cls._service) - return os - - @classmethod - def clear_isolated_creds(cls): - """ - Clears isolated creds if set - """ - if getattr(cls, 'isolated_creds'): - cls.isolated_creds.clear_isolated_creds() - - @classmethod - def _get_identity_admin_client(cls): - """ - Returns an instance of the Identity Admin API client - """ - os = clients.AdminManager(interface=cls._interface, - service=cls._service) - admin_client = os.identity_client - return admin_client - - @classmethod - def set_network_resources(self, network=False, router=False, subnet=False, - dhcp=False): - """Specify which network resources should be created - - @param network - @param router - @param subnet - @param dhcp - """ - # network resources should be set only once from callers - # in order to ensure that even if it's called multiple times in - # a chain of overloaded methods, the attribute is set only - # in the leaf class - if not self.network_resources: - self.network_resources = { - 'network': network, - 'router': router, - 'subnet': subnet, - 'dhcp': dhcp} - - def assertEmpty(self, list, msg=None): - self.assertTrue(len(list) == 0, msg) - - def assertNotEmpty(self, list, msg=None): - self.assertTrue(len(list) > 0, msg) - - -class NegativeAutoTest(BaseTestCase): - - _resources = {} - - @classmethod - def setUpClass(cls): - super(NegativeAutoTest, cls).setUpClass() - os = cls.get_client_manager() - cls.client = os.negative_client - os_admin = clients.AdminManager(interface=cls._interface, - service=cls._service) - cls.admin_client = os_admin.negative_client - - @staticmethod - def load_tests(*args): - """ - Wrapper for testscenarios to set the mandatory scenarios variable - only in case a real test loader is in place. Will be automatically - called in case the variable "load_tests" is set. - """ - if getattr(args[0], 'suiteClass', None) is not None: - loader, standard_tests, pattern = args - else: - standard_tests, module, loader = args - for test in testtools.iterate_tests(standard_tests): - schema_file = getattr(test, '_schema_file', None) - schema = getattr(test, '_schema', None) - if schema_file is not None: - setattr(test, 'scenarios', - NegativeAutoTest.generate_scenario(schema_file)) - elif schema is not None: - setattr(test, 'scenarios', - NegativeAutoTest.generate_scenario(schema)) - return testscenarios.load_tests_apply_scenarios(*args) - - @staticmethod - def generate_scenario(description): - """ - Generates the test scenario list for a given description. - - :param description: A file or dictionary with the following entries: - name (required) name for the api - http-method (required) one of HEAD,GET,PUT,POST,PATCH,DELETE - url (required) the url to be appended to the catalog url with '%s' - for each resource mentioned - resources: (optional) A list of resource names such as "server", - "flavor", etc. with an element for each '%s' in the url. This - method will call self.get_resource for each element when - constructing the positive test case template so negative - subclasses are expected to return valid resource ids when - appropriate. - json-schema (optional) A valid json schema that will be used to - create invalid data for the api calls. For "GET" and "HEAD", - the data is used to generate query strings appended to the url, - otherwise for the body of the http call. - """ - LOG.debug(description) - generator = importutils.import_class( - CONF.negative.test_generator)() - generator.validate_schema(description) - schema = description.get("json-schema", None) - resources = description.get("resources", []) - scenario_list = [] - expected_result = None - for resource in resources: - if isinstance(resource, dict): - expected_result = resource['expected_result'] - resource = resource['name'] - LOG.debug("Add resource to test %s" % resource) - scn_name = "inv_res_%s" % (resource) - scenario_list.append((scn_name, {"resource": (resource, - str(uuid.uuid4())), - "expected_result": expected_result - })) - if schema is not None: - for name, schema, expected_result in generator.generate(schema): - if (expected_result is None and - "default_result_code" in description): - expected_result = description["default_result_code"] - scenario_list.append((name, - {"schema": schema, - "expected_result": expected_result})) - LOG.debug(scenario_list) - return scenario_list - - def execute(self, description): - """ - Execute a http call on an api that are expected to - result in client errors. First it uses invalid resources that are part - of the url, and then invalid data for queries and http request bodies. - - :param description: A json file or dictionary with the following - entries: - name (required) name for the api - http-method (required) one of HEAD,GET,PUT,POST,PATCH,DELETE - url (required) the url to be appended to the catalog url with '%s' - for each resource mentioned - resources: (optional) A list of resource names such as "server", - "flavor", etc. with an element for each '%s' in the url. This - method will call self.get_resource for each element when - constructing the positive test case template so negative - subclasses are expected to return valid resource ids when - appropriate. - json-schema (optional) A valid json schema that will be used to - create invalid data for the api calls. For "GET" and "HEAD", - the data is used to generate query strings appended to the url, - otherwise for the body of the http call. - - """ - LOG.info("Executing %s" % description["name"]) - LOG.debug(description) - method = description["http-method"] - url = description["url"] - - resources = [self.get_resource(r) for - r in description.get("resources", [])] - - if hasattr(self, "resource"): - # Note(mkoderer): The resources list already contains an invalid - # entry (see get_resource). - # We just send a valid json-schema with it - valid_schema = None - schema = description.get("json-schema", None) - if schema: - valid_schema = \ - valid.ValidTestGenerator().generate_valid(schema) - new_url, body = self._http_arguments(valid_schema, url, method) - elif hasattr(self, "schema"): - new_url, body = self._http_arguments(self.schema, url, method) - else: - raise Exception("testscenarios are not active. Please make sure " - "that your test runner supports the load_tests " - "mechanism") - - if "admin_client" in description and description["admin_client"]: - client = self.admin_client - else: - client = self.client - resp, resp_body = client.send_request(method, new_url, - resources, body=body) - self._check_negative_response(resp.status, resp_body) - - def _http_arguments(self, json_dict, url, method): - LOG.debug("dict: %s url: %s method: %s" % (json_dict, url, method)) - if not json_dict: - return url, None - elif method in ["GET", "HEAD", "PUT", "DELETE"]: - return "%s?%s" % (url, urllib.urlencode(json_dict)), None - else: - return url, json.dumps(json_dict) - - def _check_negative_response(self, result, body): - expected_result = getattr(self, "expected_result", None) - self.assertTrue(result >= 400 and result < 500 and result != 413, - "Expected client error, got %s:%s" % - (result, body)) - self.assertTrue(expected_result is None or expected_result == result, - "Expected %s, got %s:%s" % - (expected_result, result, body)) - - @classmethod - def set_resource(cls, name, resource): - """ - This function can be used in setUpClass context to register a resoruce - for a test. - - :param name: The name of the kind of resource such as "flavor", "role", - etc. - :resource: The id of the resource - """ - cls._resources[name] = resource - - def get_resource(self, name): - """ - Return a valid uuid for a type of resource. If a real resource is - needed as part of a url then this method should return one. Otherwise - it can return None. - - :param name: The name of the kind of resource such as "flavor", "role", - etc. - """ - if isinstance(name, dict): - name = name['name'] - if hasattr(self, "resource") and self.resource[0] == name: - LOG.debug("Return invalid resource (%s) value: %s" % - (self.resource[0], self.resource[1])) - return self.resource[1] - if name in self._resources: - return self._resources[name] - return None - - -def SimpleNegativeAutoTest(klass): - """ - This decorator registers a test function on basis of the class name. - """ - @attr(type=['negative', 'gate']) - def generic_test(self): - if hasattr(self, '_schema'): - self.execute(self._schema) - - cn = klass.__name__ - cn = cn.replace('JSON', '') - cn = cn.replace('Test', '') - # NOTE(mkoderer): replaces uppercase chars inside the class name with '_' - lower_cn = re.sub('(?= (2, 7): + class BaseDeps(testtools.TestCase, + testtools.testcase.WithAttributes, + testresources.ResourcedTestCase): + pass +else: + # Define asserts for py26 + import unittest2 + + class BaseDeps(testtools.TestCase, + testtools.testcase.WithAttributes, + testresources.ResourcedTestCase, + unittest2.TestCase): + pass + +at_exit_set = set() + + +def validate_tearDownClass(): + if at_exit_set: + LOG.error("tearDownClass does not call the super's " + "tearDownClass in these classes: \n" + + str(at_exit_set)) + + +class BaseTestCase(BaseDeps): + setUpClassCalled = False + + @classmethod + def setUpClass(cls): + if hasattr(super(BaseTestCase, cls), 'setUpClass'): + super(BaseTestCase, cls).setUpClass() + cls.setUpClassCalled = True + + @classmethod + def tearDownClass(cls): + at_exit_set.discard(cls) + if hasattr(super(BaseTestCase, cls), 'tearDownClass'): + super(BaseTestCase, cls).tearDownClass() + + def setUp(self): + super(BaseTestCase, self).setUp() + if not self.setUpClassCalled: + raise RuntimeError("setUpClass does not calls the super's" + "setUpClass in the " + + self.__class__.__name__) + at_exit_set.add(self.__class__) + test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0) + try: + test_timeout = int(test_timeout) + except ValueError: + test_timeout = 0 + if test_timeout > 0: + self.useFixture(fixtures.Timeout(test_timeout, gentle=True)) + + if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or + os.environ.get('OS_STDOUT_CAPTURE') == '1'): + stdout = self.useFixture(fixtures.StringStream('stdout')).stream + self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout)) + if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or + os.environ.get('OS_STDERR_CAPTURE') == '1'): + stderr = self.useFixture(fixtures.StringStream('stderr')).stream + self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr)) + if (os.environ.get('OS_LOG_CAPTURE') != 'False' and + os.environ.get('OS_LOG_CAPTURE') != '0'): + self.useFixture(fixtures.LoggerFixture(nuke_handlers=False, + format=self.log_format, + level=None))