Merge "Negative test autogeneration framework"

This commit is contained in:
Jenkins 2014-02-04 14:59:01 +00:00 committed by Gerrit Code Review
commit 90da1790db
10 changed files with 639 additions and 43 deletions

View File

@ -0,0 +1,6 @@
{
"name": "get-flavor-details",
"http-method": "GET",
"url": "flavors/%s",
"resources": ["flavor"]
}

View File

@ -0,0 +1,24 @@
{
"name": "list-flavors-with-detail",
"http-method": "GET",
"url": "flavors/detail",
"json-schema": {
"type": "object",
"properties": {
"minRam": {
"type": "integer",
"results": {
"gen_none": 400,
"gen_string": 400
}
},
"minDisk": {
"type": "integer",
"results": {
"gen_none": 400,
"gen_string": 400
}
}
}
}
}

View File

@ -0,0 +1,21 @@
{
"name": "get-console-output",
"http-method": "POST",
"url": "servers/%s/action",
"resources": ["server"],
"json-schema": {
"type": "object",
"properties": {
"os-getConsoleOutput": {
"type": "object",
"properties": {
"length": {
"type": ["integer", "string"],
"minimum": 0
}
}
}
},
"additionalProperties": false
}
}

View File

@ -13,40 +13,42 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
import testscenarios
from tempest.api.compute import base
from tempest import exceptions
from tempest.test import attr
from tempest import test
class FlavorsNegativeTestJSON(base.BaseV2ComputeTest):
load_tests = testscenarios.load_tests_apply_scenarios
class FlavorsListNegativeTestJSON(base.BaseV2ComputeTest,
test.NegativeAutoTest):
_interface = 'json'
_service = 'compute'
_schema_file = 'compute/flavors/flavors_list.json'
scenarios = test.NegativeAutoTest.generate_scenario(_schema_file)
@test.attr(type=['negative', 'gate'])
def test_list_flavors_with_detail(self):
self.execute(self._schema_file)
class FlavorDetailsNegativeTestJSON(base.BaseV2ComputeTest,
test.NegativeAutoTest):
_interface = 'json'
_service = 'compute'
_schema_file = 'compute/flavors/flavor_details.json'
scenarios = test.NegativeAutoTest.generate_scenario(_schema_file)
@classmethod
def setUpClass(cls):
super(FlavorsNegativeTestJSON, cls).setUpClass()
cls.client = cls.flavors_client
super(FlavorDetailsNegativeTestJSON, cls).setUpClass()
cls.set_resource("flavor", cls.flavor_ref)
@attr(type=['negative', 'gate'])
def test_invalid_minRam_filter(self):
self.assertRaises(exceptions.BadRequest,
self.client.list_flavors_with_detail,
{'minRam': 'invalid'})
@attr(type=['negative', 'gate'])
def test_invalid_minDisk_filter(self):
self.assertRaises(exceptions.BadRequest,
self.client.list_flavors_with_detail,
{'minDisk': 'invalid'})
@attr(type=['negative', 'gate'])
def test_non_existent_flavor_id(self):
@test.attr(type=['negative', 'gate'])
def test_get_flavor_details(self):
# flavor details are not returned for non-existent flavors
nonexistent_flavor_id = str(uuid.uuid4())
self.assertRaises(exceptions.NotFound, self.client.get_flavor_details,
nonexistent_flavor_id)
class FlavorsNegativeTestXML(FlavorsNegativeTestJSON):
_interface = 'xml'
self.execute(self._schema_file)

View File

@ -0,0 +1,48 @@
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from tempest.api.compute import base
from tempest import exceptions
from tempest.test import attr
class FlavorsNegativeTestXML(base.BaseV2ComputeTest):
_interface = 'xml'
@classmethod
def setUpClass(cls):
super(FlavorsNegativeTestXML, cls).setUpClass()
cls.client = cls.flavors_client
@attr(type=['negative', 'gate'])
def test_invalid_minRam_filter(self):
self.assertRaises(exceptions.BadRequest,
self.client.list_flavors_with_detail,
{'minRam': 'invalid'})
@attr(type=['negative', 'gate'])
def test_invalid_minDisk_filter(self):
self.assertRaises(exceptions.BadRequest,
self.client.list_flavors_with_detail,
{'minDisk': 'invalid'})
@attr(type=['negative', 'gate'])
def test_non_existent_flavor_id(self):
# flavor details are not returned for non-existent flavors
nonexistent_flavor_id = str(uuid.uuid4())
self.assertRaises(exceptions.NotFound, self.client.get_flavor_details,
nonexistent_flavor_id)

View File

@ -0,0 +1,41 @@
# Copyright 2014 Red Hat, Inc & Deutsche Telekom AG
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testscenarios
from tempest.api.compute import base
from tempest import test
load_tests = testscenarios.load_tests_apply_scenarios
class GetConsoleOutputNegativeTestJSON(base.BaseV2ComputeTest,
test.NegativeAutoTest):
_interface = 'json'
_service = 'compute'
_schema_file = 'compute/servers/get_console_output.json'
scenarios = test.NegativeAutoTest.generate_scenario(_schema_file)
@classmethod
def setUpClass(cls):
super(GetConsoleOutputNegativeTestJSON, cls).setUpClass()
_resp, server = cls.create_test_server()
cls.set_resource("server", server['id'])
@test.attr(type=['negative', 'gate'])
def test_get_console_output(self):
self.execute(self._schema_file)

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common.rest_client import NegativeRestClient
from tempest import config
from tempest import exceptions
from tempest.openstack.common import log as logging
@ -174,7 +175,7 @@ class Manager(object):
"""
def __init__(self, username=None, password=None, tenant_name=None,
interface='json'):
interface='json', service=None):
"""
We allow overriding of the credentials used within the various
client classes managed by the Manager object. Left as None, the
@ -323,10 +324,15 @@ class Manager(object):
self.hosts_v3_client = HostsV3ClientJSON(*client_args)
if CONF.service_available.ceilometer:
self.telemetry_client = TelemetryClientJSON(*client_args)
self.negative_client = NegativeRestClient(*client_args)
self.negative_client.service = service
if client_args_v3_auth:
self.servers_client_v3_auth = ServersClientJSON(
*client_args_v3_auth)
self.negative_v3_client = NegativeRestClient(
*client_args_v3_auth)
self.negative_v3_client.service = service
else:
msg = "Unsupported interface type `%s'" % interface
raise exceptions.InvalidConfiguration(msg)
@ -354,11 +360,12 @@ class AltManager(Manager):
managed client objects
"""
def __init__(self, interface='json'):
def __init__(self, interface='json', service=None):
super(AltManager, self).__init__(CONF.identity.alt_username,
CONF.identity.alt_password,
CONF.identity.alt_tenant_name,
interface=interface)
interface=interface,
service=service)
class AdminManager(Manager):
@ -368,11 +375,12 @@ class AdminManager(Manager):
managed client objects
"""
def __init__(self, interface='json'):
def __init__(self, interface='json', service=None):
super(AdminManager, self).__init__(CONF.identity.admin_username,
CONF.identity.admin_password,
CONF.identity.admin_tenant_name,
interface=interface)
interface=interface,
service=service)
class ComputeAdminManager(Manager):
@ -382,12 +390,13 @@ class ComputeAdminManager(Manager):
managed client objects
"""
def __init__(self, interface='json'):
def __init__(self, interface='json', service=None):
base = super(ComputeAdminManager, self)
base.__init__(CONF.compute_admin.username,
CONF.compute_admin.password,
CONF.compute_admin.tenant_name,
interface=interface)
interface=interface,
service=service)
class OrchestrationManager(Manager):
@ -395,9 +404,10 @@ class OrchestrationManager(Manager):
Manager object that uses the admin credentials for its
so that heat templates can create users
"""
def __init__(self, interface='json'):
def __init__(self, interface='json', service=None):
base = super(OrchestrationManager, self)
base.__init__(CONF.identity.admin_username,
CONF.identity.admin_password,
CONF.identity.tenant_name,
interface=interface)
interface=interface,
service=service)

View File

@ -0,0 +1,239 @@
# Copyright 2014 Red Hat, Inc. & Deutsche Telekom AG
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import jsonschema
from tempest.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def generate_valid(schema):
"""
Create a valid dictionary based on the types in a json schema.
"""
LOG.debug("generate_valid: %s" % schema)
schema_type = schema["type"]
if isinstance(schema_type, list):
# Just choose the first one since all are valid.
schema_type = schema_type[0]
return type_map_valid[schema_type](schema)
def generate_valid_string(schema):
size = schema.get("minLength", 0)
# TODO(dkr mko): handle format and pattern
return "x" * size
def generate_valid_integer(schema):
# TODO(dkr mko): handle multipleOf
if "minimum" in schema:
minimum = schema["minimum"]
if "exclusiveMinimum" not in schema:
return minimum
else:
return minimum + 1
if "maximum" in schema:
maximum = schema["maximum"]
if "exclusiveMaximum" not in schema:
return maximum
else:
return maximum - 1
return 0
def generate_valid_object(schema):
obj = {}
for k, v in schema["properties"].iteritems():
obj[k] = generate_valid(v)
return obj
def generate_invalid(schema):
"""
Generate an invalid json dictionary based on a schema.
Only one value is mis-generated for each dictionary created.
Any generator must return a list of tuples or a single tuple.
The values of this tuple are:
result[0]: Name of the test
result[1]: json schema for the test
result[2]: expected result of the test (can be None)
"""
LOG.debug("generate_invalid: %s" % schema)
schema_type = schema["type"]
if isinstance(schema_type, list):
if "integer" in schema_type:
schema_type = "integer"
else:
raise Exception("non-integer list types not supported")
result = []
for generator in type_map_invalid[schema_type]:
ret = generator(schema)
if ret is not None:
if isinstance(ret, list):
result.extend(ret)
elif isinstance(ret, tuple):
result.append(ret)
else:
raise Exception("generator (%s) returns invalid result"
% generator)
LOG.debug("result: %s" % result)
return result
def _check_for_expected_result(name, schema):
expected_result = None
if "results" in schema:
if name in schema["results"]:
expected_result = schema["results"][name]
return expected_result
def generator(fn):
"""
Decorator for simple generators that simply return one value
"""
def wrapped(schema):
result = fn(schema)
if result is not None:
expected_result = _check_for_expected_result(fn.__name__, schema)
return (fn.__name__, result, expected_result)
return
return wrapped
@generator
def gen_int(_):
return 4
@generator
def gen_string(_):
return "XXXXXX"
def gen_none(schema):
# Note(mkoderer): it's not using the decorator otherwise it'd be filtered
expected_result = _check_for_expected_result('gen_none', schema)
return ('gen_none', None, expected_result)
@generator
def gen_str_min_length(schema):
min_length = schema.get("minLength", 0)
if min_length > 0:
return "x" * (min_length - 1)
@generator
def gen_str_max_length(schema):
max_length = schema.get("maxLength", -1)
if max_length > -1:
return "x" * (max_length + 1)
@generator
def gen_int_min(schema):
if "minimum" in schema:
minimum = schema["minimum"]
if "exclusiveMinimum" not in schema:
minimum -= 1
return minimum
@generator
def gen_int_max(schema):
if "maximum" in schema:
maximum = schema["maximum"]
if "exclusiveMaximum" not in schema:
maximum += 1
return maximum
def gen_obj_remove_attr(schema):
invalids = []
valid = generate_valid(schema)
required = schema.get("required", [])
for r in required:
new_valid = copy.deepcopy(valid)
del new_valid[r]
invalids.append(("gen_obj_remove_attr", new_valid, None))
return invalids
@generator
def gen_obj_add_attr(schema):
valid = generate_valid(schema)
if not schema.get("additionalProperties", True):
new_valid = copy.deepcopy(valid)
new_valid["$$$$$$$$$$"] = "xxx"
return new_valid
def gen_inv_prop_obj(schema):
LOG.debug("generate_invalid_object: %s" % schema)
valid = generate_valid(schema)
invalids = []
properties = schema["properties"]
for k, v in properties.iteritems():
for invalid in generate_invalid(v):
LOG.debug(v)
new_valid = copy.deepcopy(valid)
new_valid[k] = invalid[1]
name = "prop_%s_%s" % (k, invalid[0])
invalids.append((name, new_valid, invalid[2]))
LOG.debug("generate_invalid_object return: %s" % invalids)
return invalids
type_map_valid = {"string": generate_valid_string,
"integer": generate_valid_integer,
"object": generate_valid_object}
type_map_invalid = {"string": [gen_int,
gen_none,
gen_str_min_length,
gen_str_max_length],
"integer": [gen_string,
gen_none,
gen_int_min,
gen_int_max],
"object": [gen_obj_remove_attr,
gen_obj_add_attr,
gen_inv_prop_obj]}
schema = {"type": "object",
"properties":
{"name": {"type": "string"},
"http-method": {"enum": ["GET", "PUT", "HEAD",
"POST", "PATCH", "DELETE", 'COPY']},
"url": {"type": "string"},
"json-schema": jsonschema._utils.load_schema("draft4"),
"resources": {"type": "array", "items": {"type": "string"}},
"results": {"type": "object",
"properties": {}}
},
"required": ["name", "http-method", "url"],
"additionalProperties": False,
}
def validate_negative_test_schema(nts):
jsonschema.validate(nts, schema)

View File

@ -559,3 +559,33 @@ class RestClientXML(RestClient):
'retry-after' not in resp):
return True
return 'exceed' in resp_body.get('message', 'blabla')
class NegativeRestClient(RestClient):
"""
Version of RestClient that does not raise exceptions.
"""
def _error_checker(self, method, url,
headers, body, resp, resp_body):
pass
def send_request(self, method, url_template, resources, body=None):
url = url_template % tuple(resources)
if method == "GET":
resp, body = self.get(url)
elif method == "POST":
resp, body = self.post(url, body, self.headers)
elif method == "PUT":
resp, body = self.put(url, body, self.headers)
elif method == "PATCH":
resp, body = self.patch(url, body, self.headers)
elif method == "HEAD":
resp, body = self.head(url)
elif method == "DELETE":
resp, body = self.delete(url)
elif method == "COPY":
resp, body = self.copy(url)
else:
assert False
return resp, body

View File

@ -15,8 +15,11 @@
import atexit
import functools
import json
import os
import time
import urllib
import uuid
import fixtures
import nose.plugins.attrib
@ -24,6 +27,7 @@ import testresources
import testtools
from tempest import clients
from tempest.common import generate_json
from tempest.common import isolated_creds
from tempest import config
from tempest import exceptions
@ -224,6 +228,7 @@ class BaseTestCase(testtools.TestCase,
testresources.ResourcedTestCase):
setUpClassCalled = False
_service = None
network_resources = {}
@ -286,23 +291,27 @@ class BaseTestCase(testtools.TestCase,
os = clients.Manager(username=username,
password=password,
tenant_name=tenant_name,
interface=cls._interface)
interface=cls._interface,
service=cls._service)
elif interface:
os = clients.Manager(username=username,
password=password,
tenant_name=tenant_name,
interface=interface)
interface=interface,
service=cls._service)
else:
os = clients.Manager(username=username,
password=password,
tenant_name=tenant_name)
tenant_name=tenant_name,
service=cls._service)
else:
if getattr(cls, '_interface', None):
os = clients.Manager(interface=cls._interface)
os = clients.Manager(interface=cls._interface,
service=cls._service)
elif interface:
os = clients.Manager(interface=interface)
os = clients.Manager(interface=interface, service=cls._service)
else:
os = clients.Manager()
os = clients.Manager(service=cls._service)
return os
@classmethod
@ -318,7 +327,8 @@ class BaseTestCase(testtools.TestCase,
"""
Returns an instance of the Identity Admin API client
"""
os = clients.AdminManager(interface=cls._interface)
os = clients.AdminManager(interface=cls._interface,
service=cls._service)
admin_client = os.identity_client
return admin_client
@ -344,6 +354,171 @@ class BaseTestCase(testtools.TestCase,
'dhcp': dhcp}
class NegativeAutoTest(BaseTestCase):
_resources = {}
@classmethod
def setUpClass(cls):
super(NegativeAutoTest, cls).setUpClass()
os = cls.get_client_manager()
cls.client = os.negative_client
@staticmethod
def load_schema(file):
"""
Loads a schema from a file on a specified location.
:param file: the file name
"""
#NOTE(mkoderer): must be extended for xml support
fn = os.path.join(
os.path.abspath(os.path.dirname(os.path.dirname(__file__))),
"etc", "schemas", file)
LOG.debug("Open schema file: %s" % (fn))
return json.load(open(fn))
@staticmethod
def generate_scenario(description_file):
"""
Generates the test scenario list for a given description.
:param description: A dictionary with the following entries:
name (required) name for the api
http-method (required) one of HEAD,GET,PUT,POST,PATCH,DELETE
url (required) the url to be appended to the catalog url with '%s'
for each resource mentioned
resources: (optional) A list of resource names such as "server",
"flavor", etc. with an element for each '%s' in the url. This
method will call self.get_resource for each element when
constructing the positive test case template so negative
subclasses are expected to return valid resource ids when
appropriate.
json-schema (optional) A valid json schema that will be used to
create invalid data for the api calls. For "GET" and "HEAD",
the data is used to generate query strings appended to the url,
otherwise for the body of the http call.
"""
description = NegativeAutoTest.load_schema(description_file)
LOG.debug(description)
generate_json.validate_negative_test_schema(description)
schema = description.get("json-schema", None)
resources = description.get("resources", [])
scenario_list = []
for resource in resources:
LOG.debug("Add resource to test %s" % resource)
scn_name = "inv_res_%s" % (resource)
scenario_list.append((scn_name, {"resource": (resource,
str(uuid.uuid4()))
}))
if schema is not None:
for invalid in generate_json.generate_invalid(schema):
scenario_list.append((invalid[0],
{"schema": invalid[1],
"expected_result": invalid[2]}))
LOG.debug(scenario_list)
return scenario_list
def execute(self, description_file):
"""
Execute a http call on an api that are expected to
result in client errors. First it uses invalid resources that are part
of the url, and then invalid data for queries and http request bodies.
:param description: A dictionary with the following entries:
name (required) name for the api
http-method (required) one of HEAD,GET,PUT,POST,PATCH,DELETE
url (required) the url to be appended to the catalog url with '%s'
for each resource mentioned
resources: (optional) A list of resource names such as "server",
"flavor", etc. with an element for each '%s' in the url. This
method will call self.get_resource for each element when
constructing the positive test case template so negative
subclasses are expected to return valid resource ids when
appropriate.
json-schema (optional) A valid json schema that will be used to
create invalid data for the api calls. For "GET" and "HEAD",
the data is used to generate query strings appended to the url,
otherwise for the body of the http call.
"""
description = NegativeAutoTest.load_schema(description_file)
LOG.info("Executing %s" % description["name"])
LOG.debug(description)
method = description["http-method"]
url = description["url"]
resources = [self.get_resource(r) for
r in description.get("resources", [])]
if hasattr(self, "resource"):
# Note(mkoderer): The resources list already contains an invalid
# entry (see get_resource).
# We just send a valid json-schema with it
valid = None
schema = description.get("json-schema", None)
if schema:
valid = generate_json.generate_valid(schema)
new_url, body = self._http_arguments(valid, url, method)
resp, resp_body = self.client.send_request(method, new_url,
resources, body=body)
self._check_negative_response(resp.status, resp_body)
return
if hasattr(self, "schema"):
new_url, body = self._http_arguments(self.schema, url, method)
resp, resp_body = self.client.send_request(method, new_url,
resources, body=body)
self._check_negative_response(resp.status, resp_body)
def _http_arguments(self, json_dict, url, method):
LOG.debug("dict: %s url: %s method: %s" % (json_dict, url, method))
if not json_dict:
return url, None
elif method in ["GET", "HEAD", "PUT", "DELETE"]:
return "%s?%s" % (url, urllib.urlencode(json_dict)), None
else:
return url, json.dumps(json_dict)
def _check_negative_response(self, result, body):
expected_result = getattr(self, "expected_result", None)
self.assertTrue(result >= 400 and result < 500 and result != 413,
"Expected client error, got %s:%s" %
(result, body))
self.assertTrue(expected_result is None or expected_result == result,
"Expected %s, got %s:%s" %
(expected_result, result, body))
@classmethod
def set_resource(cls, name, resource):
"""
This function can be used in setUpClass context to register a resoruce
for a test.
:param name: The name of the kind of resource such as "flavor", "role",
etc.
:resource: The id of the resource
"""
cls._resources[name] = resource
def get_resource(self, name):
"""
Return a valid uuid for a type of resource. If a real resource is
needed as part of a url then this method should return one. Otherwise
it can return None.
:param name: The name of the kind of resource such as "flavor", "role",
etc.
"""
if hasattr(self, "resource") and self.resource[0] == name:
LOG.debug("Return invalid resource (%s) value: %s" %
(self.resource[0], self.resource[1]))
return self.resource[1]
if name in self._resources:
return self._resources[name]
return None
def call_until_true(func, duration, sleep_for):
"""
Call the given function until it returns True (and return True) or