Merge "Add more test for magnum API"

This commit is contained in:
Jenkins
2014-12-30 15:55:18 +00:00
committed by Gerrit Code Review
4 changed files with 499 additions and 1 deletions

235
magnum/tests/api/base.py Normal file
View File

@@ -0,0 +1,235 @@
# -*- encoding: utf-8 -*-
#
# Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Base classes for API tests."""
# NOTE: Ported from ceilometer/tests/api.py (subsequently moved to
# ceilometer/tests/api/__init__.py). This should be oslo'ified:
# https://bugs.launchpad.net/ironic/+bug/1255115.
# NOTE(deva): import auth_token so we can override a config option
from keystonemiddleware import auth_token # noqa
from oslo.config import cfg
import pecan
import pecan.testing
from six.moves.urllib import parse as urlparse
from magnum.db import api as dbapi
from magnum.tests.db import base
PATH_PREFIX = '/v1'
class FunctionalTest(base.DbTestCase):
"""Used for functional tests of Pecan controllers where you need to
test your literal application and its integration with the
framework.
"""
SOURCE_DATA = {'test_source': {'somekey': '666'}}
def setUp(self):
super(FunctionalTest, self).setUp()
cfg.CONF.set_override("auth_version", "v2.0",
group='keystone_authtoken')
cfg.CONF.set_override("admin_user", "admin",
group='keystone_authtoken')
self.app = self._make_app()
self.dbapi = dbapi.get_instance()
def reset_pecan():
pecan.set_config({}, overwrite=True)
self.addCleanup(reset_pecan)
def _make_app(self, enable_acl=False):
# Determine where we are so we can set up paths in the config
root_dir = self.path_get()
self.config = {
'app': {
'root': 'magnum.api.controllers.root.RootController',
'modules': ['magnum.api'],
'static_root': '%s/public' % root_dir,
'template_path': '%s/api/templates' % root_dir,
'enable_acl': enable_acl,
'acl_public_routes': ['/', '/v1'],
},
}
return pecan.testing.load_test_app(self.config)
def _request_json(self, path, params, expect_errors=False, headers=None,
method="post", extra_environ=None, status=None,
path_prefix=PATH_PREFIX):
"""Sends simulated HTTP request to Pecan test app.
:param path: url path of target service
:param params: content for wsgi.input of request
:param expect_errors: Boolean value; whether an error is expected based
on request
:param headers: a dictionary of headers to send along with the request
:param method: Request method type. Appropriate method function call
should be used rather than passing attribute in.
:param extra_environ: a dictionary of environ variables to send along
with the request
:param status: expected status code of response
:param path_prefix: prefix of the url path
"""
full_path = path_prefix + path
print('%s: %s %s' % (method.upper(), full_path, params))
response = getattr(self.app, "%s_json" % method)(
str(full_path),
params=params,
headers=headers,
status=status,
extra_environ=extra_environ,
expect_errors=expect_errors
)
print('GOT:%s' % response)
return response
def put_json(self, path, params, expect_errors=False, headers=None,
extra_environ=None, status=None):
"""Sends simulated HTTP PUT request to Pecan test app.
:param path: url path of target service
:param params: content for wsgi.input of request
:param expect_errors: Boolean value; whether an error is expected based
on request
:param headers: a dictionary of headers to send along with the request
:param extra_environ: a dictionary of environ variables to send along
with the request
:param status: expected status code of response
"""
return self._request_json(path=path, params=params,
expect_errors=expect_errors,
headers=headers, extra_environ=extra_environ,
status=status, method="put")
def post_json(self, path, params, expect_errors=False, headers=None,
extra_environ=None, status=None):
"""Sends simulated HTTP POST request to Pecan test app.
:param path: url path of target service
:param params: content for wsgi.input of request
:param expect_errors: Boolean value; whether an error is expected based
on request
:param headers: a dictionary of headers to send along with the request
:param extra_environ: a dictionary of environ variables to send along
with the request
:param status: expected status code of response
"""
return self._request_json(path=path, params=params,
expect_errors=expect_errors,
headers=headers, extra_environ=extra_environ,
status=status, method="post")
def patch_json(self, path, params, expect_errors=False, headers=None,
extra_environ=None, status=None):
"""Sends simulated HTTP PATCH request to Pecan test app.
:param path: url path of target service
:param params: content for wsgi.input of request
:param expect_errors: Boolean value; whether an error is expected based
on request
:param headers: a dictionary of headers to send along with the request
:param extra_environ: a dictionary of environ variables to send along
with the request
:param status: expected status code of response
"""
return self._request_json(path=path, params=params,
expect_errors=expect_errors,
headers=headers, extra_environ=extra_environ,
status=status, method="patch")
def delete(self, path, expect_errors=False, headers=None,
extra_environ=None, status=None, path_prefix=PATH_PREFIX):
"""Sends simulated HTTP DELETE request to Pecan test app.
:param path: url path of target service
:param expect_errors: Boolean value; whether an error is expected based
on request
:param headers: a dictionary of headers to send along with the request
:param extra_environ: a dictionary of environ variables to send along
with the request
:param status: expected status code of response
:param path_prefix: prefix of the url path
"""
full_path = path_prefix + path
print('DELETE: %s' % (full_path))
response = self.app.delete(str(full_path),
headers=headers,
status=status,
extra_environ=extra_environ,
expect_errors=expect_errors)
print('GOT:%s' % response)
return response
def get_json(self, path, expect_errors=False, headers=None,
extra_environ=None, q=[], path_prefix=PATH_PREFIX, **params):
"""Sends simulated HTTP GET request to Pecan test app.
:param path: url path of target service
:param expect_errors: Boolean value;whether an error is expected based
on request
:param headers: a dictionary of headers to send along with the request
:param extra_environ: a dictionary of environ variables to send along
with the request
:param q: list of queries consisting of: field, value, op, and type
keys
:param path_prefix: prefix of the url path
:param params: content for wsgi.input of request
"""
full_path = path_prefix + path
query_params = {'q.field': [],
'q.value': [],
'q.op': [],
}
for query in q:
for name in ['field', 'op', 'value']:
query_params['q.%s' % name].append(query.get(name, ''))
all_params = {}
all_params.update(params)
if q:
all_params.update(query_params)
print('GET: %s %r' % (full_path, all_params))
response = self.app.get(full_path,
params=all_params,
headers=headers,
extra_environ=extra_environ,
expect_errors=expect_errors)
if not expect_errors:
response = response.json
print('GOT:%s' % response)
return response
def validate_link(self, link, bookmark=False):
"""Checks if the given link can get correct data."""
# removes the scheme and net location parts of the link
url_parts = list(urlparse.urlparse(link))
url_parts[0] = url_parts[1] = ''
# bookmark link should not have the version in the URL
if bookmark and url_parts[2].startswith(PATH_PREFIX):
return False
full_path = urlparse.urlunparse(url_parts)
try:
self.get_json(full_path, path_prefix='')
return True
except Exception:
return False

View File

@@ -0,0 +1,198 @@
# coding: utf-8
#
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import six
import webtest
import wsme
from magnum.api.controllers.v1 import types
from magnum.common import exception
from magnum.common import utils
from magnum.tests.api import base
class TestMacAddressType(base.FunctionalTest):
def test_valid_mac_addr(self):
test_mac = 'aa:bb:cc:11:22:33'
with mock.patch.object(utils, 'validate_and_normalize_mac') as m_mock:
types.MacAddressType.validate(test_mac)
m_mock.assert_called_once_with(test_mac)
def test_invalid_mac_addr(self):
self.assertRaises(exception.InvalidMAC,
types.MacAddressType.validate, 'invalid-mac')
class TestUuidType(base.FunctionalTest):
def test_valid_uuid(self):
test_uuid = '1a1a1a1a-2b2b-3c3c-4d4d-5e5e5e5e5e5e'
with mock.patch.object(utils, 'is_uuid_like') as uuid_mock:
types.UuidType.validate(test_uuid)
uuid_mock.assert_called_once_with(test_uuid)
def test_invalid_uuid(self):
self.assertRaises(exception.InvalidUUID,
types.UuidType.validate, 'invalid-uuid')
class MyPatchType(types.JsonPatchType):
"""Helper class for TestJsonPatchType tests."""
@staticmethod
def mandatory_attrs():
return ['/mandatory']
@staticmethod
def internal_attrs():
return ['/internal']
class MyRoot(wsme.WSRoot):
"""Helper class for TestJsonPatchType tests."""
@wsme.expose([wsme.types.text], body=[MyPatchType])
@wsme.validate([MyPatchType])
def test(self, patch):
return patch
class TestJsonPatchType(base.FunctionalTest):
def setUp(self):
super(TestJsonPatchType, self).setUp()
self.app = webtest.TestApp(MyRoot(['restjson']).wsgiapp())
def _patch_json(self, params, expect_errors=False):
return self.app.patch_json('/test', params=params,
headers={'Accept': 'application/json'},
expect_errors=expect_errors)
def test_valid_patches(self):
valid_patches = [{'path': '/extra/foo', 'op': 'remove'},
{'path': '/extra/foo', 'op': 'add', 'value': 'bar'},
{'path': '/foo', 'op': 'replace', 'value': 'bar'}]
ret = self._patch_json(valid_patches, False)
self.assertEqual(200, ret.status_int)
self.assertEqual(sorted(valid_patches), sorted(ret.json))
def test_cannot_update_internal_attr(self):
patch = [{'path': '/internal', 'op': 'replace', 'value': 'foo'}]
ret = self._patch_json(patch, True)
self.assertEqual(400, ret.status_int)
self.assertTrue(ret.json['faultstring'])
def test_mandatory_attr(self):
patch = [{'op': 'replace', 'path': '/mandatory', 'value': 'foo'}]
ret = self._patch_json(patch, False)
self.assertEqual(200, ret.status_int)
self.assertEqual(patch, ret.json)
def test_cannot_remove_mandatory_attr(self):
patch = [{'op': 'remove', 'path': '/mandatory'}]
ret = self._patch_json(patch, True)
self.assertEqual(400, ret.status_int)
self.assertTrue(ret.json['faultstring'])
def test_missing_required_fields_path(self):
missing_path = [{'op': 'remove'}]
ret = self._patch_json(missing_path, True)
self.assertEqual(400, ret.status_int)
self.assertTrue(ret.json['faultstring'])
def test_missing_required_fields_op(self):
missing_op = [{'path': '/foo'}]
ret = self._patch_json(missing_op, True)
self.assertEqual(400, ret.status_int)
self.assertTrue(ret.json['faultstring'])
def test_invalid_op(self):
patch = [{'path': '/foo', 'op': 'invalid'}]
ret = self._patch_json(patch, True)
self.assertEqual(400, ret.status_int)
self.assertTrue(ret.json['faultstring'])
def test_invalid_path(self):
patch = [{'path': 'invalid-path', 'op': 'remove'}]
ret = self._patch_json(patch, True)
self.assertEqual(400, ret.status_int)
self.assertTrue(ret.json['faultstring'])
def test_cannot_add_with_no_value(self):
patch = [{'path': '/extra/foo', 'op': 'add'}]
ret = self._patch_json(patch, True)
self.assertEqual(400, ret.status_int)
self.assertTrue(ret.json['faultstring'])
def test_cannot_replace_with_no_value(self):
patch = [{'path': '/foo', 'op': 'replace'}]
ret = self._patch_json(patch, True)
self.assertEqual(400, ret.status_int)
self.assertTrue(ret.json['faultstring'])
class TestMultiType(base.FunctionalTest):
def test_valid_values(self):
vt = types.MultiType(wsme.types.text, six.integer_types)
value = vt.validate("hello")
self.assertEqual("hello", value)
value = vt.validate(10)
self.assertEqual(10, value)
def test_invalid_values(self):
vt = types.MultiType(wsme.types.text, six.integer_types)
self.assertRaises(ValueError, vt.validate, 0.10)
self.assertRaises(ValueError, vt.validate, object())
def test_multitype_tostring(self):
vt = types.MultiType(str, int)
vts = str(vt)
self.assertIn(str(str), vts)
self.assertIn(str(int), vts)
class TestBooleanType(base.FunctionalTest):
def test_valid_true_values(self):
v = types.BooleanType()
self.assertTrue(v.validate("true"))
self.assertTrue(v.validate("TRUE"))
self.assertTrue(v.validate("True"))
self.assertTrue(v.validate("t"))
self.assertTrue(v.validate("1"))
self.assertTrue(v.validate("y"))
self.assertTrue(v.validate("yes"))
self.assertTrue(v.validate("on"))
def test_valid_false_values(self):
v = types.BooleanType()
self.assertFalse(v.validate("false"))
self.assertFalse(v.validate("FALSE"))
self.assertFalse(v.validate("False"))
self.assertFalse(v.validate("f"))
self.assertFalse(v.validate("0"))
self.assertFalse(v.validate("n"))
self.assertFalse(v.validate("no"))
self.assertFalse(v.validate("off"))
def test_invalid_value(self):
v = types.BooleanType()
self.assertRaises(exception.Invalid, v.validate, "invalid-value")
self.assertRaises(exception.Invalid, v.validate, "01")

View File

@@ -0,0 +1,49 @@
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import wsme
from magnum.api.controllers.v1 import utils
from magnum.tests.api import base
from oslo.config import cfg
CONF = cfg.CONF
class TestApiUtils(base.FunctionalTest):
def test_validate_limit(self):
limit = utils.validate_limit(10)
self.assertEqual(10, 10)
# max limit
limit = utils.validate_limit(999999999)
self.assertEqual(CONF.api.max_limit, limit)
# negative
self.assertRaises(wsme.exc.ClientSideError, utils.validate_limit, -1)
# zero
self.assertRaises(wsme.exc.ClientSideError, utils.validate_limit, 0)
def test_validate_sort_dir(self):
sort_dir = utils.validate_sort_dir('asc')
self.assertEqual('asc', sort_dir)
# invalid sort_dir parameter
self.assertRaises(wsme.exc.ClientSideError,
utils.validate_sort_dir,
'fake-sort')

View File

@@ -57,4 +57,20 @@ class TestCase(base.BaseTestCase):
"""Override config options for a test."""
group = kw.pop('group', None)
for k, v in kw.iteritems():
CONF.set_override(k, v, group)
CONF.set_override(k, v, group)
def path_get(self, project_file=None):
"""Get the absolute path to a file. Used for testing the API.
:param project_file: File whose path to return. Default: None.
:returns: path to the specified file, or path to project root.
"""
root = os.path.abspath(os.path.join(os.path.dirname(__file__),
'..',
'..',
)
)
if project_file:
return os.path.join(root, project_file)
else:
return root