# vim: tabstop=4 shiftwidth=4 softtabstop=4 import inspect import webob from nova.api.openstack import wsgi from nova import exception from nova.openstack.common import gettextutils from nova import test from nova.tests.api.openstack import fakes from nova.tests import utils class RequestTest(test.TestCase): def test_content_type_missing(self): request = wsgi.Request.blank('/tests/123', method='POST') request.body = "" self.assertEqual(None, request.get_content_type()) def test_content_type_unsupported(self): request = wsgi.Request.blank('/tests/123', method='POST') request.headers["Content-Type"] = "text/html" request.body = "asdf
" self.assertRaises(exception.InvalidContentType, request.get_content_type) def test_content_type_with_charset(self): request = wsgi.Request.blank('/tests/123') request.headers["Content-Type"] = "application/json; charset=UTF-8" result = request.get_content_type() self.assertEqual(result, "application/json") def test_content_type_from_accept(self): for content_type in ('application/xml', 'application/vnd.openstack.compute+xml', 'application/json', 'application/vnd.openstack.compute+json'): request = wsgi.Request.blank('/tests/123') request.headers["Accept"] = content_type result = request.best_match_content_type() self.assertEqual(result, content_type) def test_content_type_from_accept_best(self): request = wsgi.Request.blank('/tests/123') request.headers["Accept"] = "application/xml, application/json" result = request.best_match_content_type() self.assertEqual(result, "application/json") request = wsgi.Request.blank('/tests/123') request.headers["Accept"] = ("application/json; q=0.3, " "application/xml; q=0.9") result = request.best_match_content_type() self.assertEqual(result, "application/xml") def test_content_type_from_query_extension(self): request = wsgi.Request.blank('/tests/123.xml') result = request.best_match_content_type() self.assertEqual(result, "application/xml") request = wsgi.Request.blank('/tests/123.json') result = request.best_match_content_type() self.assertEqual(result, "application/json") request = wsgi.Request.blank('/tests/123.invalid') result = request.best_match_content_type() self.assertEqual(result, "application/json") def test_content_type_accept_and_query_extension(self): request = wsgi.Request.blank('/tests/123.xml') request.headers["Accept"] = "application/json" result = request.best_match_content_type() self.assertEqual(result, "application/xml") def test_content_type_accept_default(self): request = wsgi.Request.blank('/tests/123.unsupported') request.headers["Accept"] = "application/unsupported1" result = request.best_match_content_type() self.assertEqual(result, "application/json") def test_cache_and_retrieve_instances(self): request = wsgi.Request.blank('/foo') instances = [] for x in xrange(3): instances.append({'uuid': 'uuid%s' % x}) # Store 2 request.cache_db_instances(instances[:2]) # Store 1 request.cache_db_instance(instances[2]) self.assertEqual(request.get_db_instance('uuid0'), instances[0]) self.assertEqual(request.get_db_instance('uuid1'), instances[1]) self.assertEqual(request.get_db_instance('uuid2'), instances[2]) self.assertEqual(request.get_db_instance('uuid3'), None) self.assertEqual(request.get_db_instances(), {'uuid0': instances[0], 'uuid1': instances[1], 'uuid2': instances[2]}) def test_from_request(self): self.stubs.Set(gettextutils, 'get_available_languages', fakes.fake_get_available_languages) request = wsgi.Request.blank('/') accepted = 'bogus;q=1.1, en-gb;q=0.7,en-us,en;q=.5,*;q=.7' request.headers = {'Accept-Language': accepted} self.assertEqual(request.best_match_language(), 'en_US') def test_asterisk(self): # asterisk should match first available if there # are not any other available matches self.stubs.Set(gettextutils, 'get_available_languages', fakes.fake_get_available_languages) request = wsgi.Request.blank('/') accepted = '*,es;q=.5' request.headers = {'Accept-Language': accepted} self.assertEqual(request.best_match_language(), 'en_GB') def test_prefix(self): self.stubs.Set(gettextutils, 'get_available_languages', fakes.fake_get_available_languages) request = wsgi.Request.blank('/') accepted = 'zh' request.headers = {'Accept-Language': accepted} self.assertEqual(request.best_match_language(), 'zh_CN') def test_secondary(self): self.stubs.Set(gettextutils, 'get_available_languages', fakes.fake_get_available_languages) request = wsgi.Request.blank('/') accepted = 'nn,en-gb;q=.5' request.headers = {'Accept-Language': accepted} self.assertEqual(request.best_match_language(), 'en_GB') def test_none_found(self): self.stubs.Set(gettextutils, 'get_available_languages', fakes.fake_get_available_languages) request = wsgi.Request.blank('/') accepted = 'nb-no' request.headers = {'Accept-Language': accepted} self.assertEqual(request.best_match_language(), 'en_US') class ActionDispatcherTest(test.TestCase): def test_dispatch(self): serializer = wsgi.ActionDispatcher() serializer.create = lambda x: 'pants' self.assertEqual(serializer.dispatch({}, action='create'), 'pants') def test_dispatch_action_None(self): serializer = wsgi.ActionDispatcher() serializer.create = lambda x: 'pants' serializer.default = lambda x: 'trousers' self.assertEqual(serializer.dispatch({}, action=None), 'trousers') def test_dispatch_default(self): serializer = wsgi.ActionDispatcher() serializer.create = lambda x: 'pants' serializer.default = lambda x: 'trousers' self.assertEqual(serializer.dispatch({}, action='update'), 'trousers') class DictSerializerTest(test.TestCase): def test_dispatch_default(self): serializer = wsgi.DictSerializer() self.assertEqual(serializer.serialize({}, 'update'), '') class XMLDictSerializerTest(test.TestCase): def test_xml(self): input_dict = dict(servers=dict(a=(2, 3))) expected_xml = '(2,3)' serializer = wsgi.XMLDictSerializer(xmlns="asdf") result = serializer.serialize(input_dict) result = result.replace('\n', '').replace(' ', '') self.assertEqual(result, expected_xml) class JSONDictSerializerTest(test.TestCase): def test_json(self): input_dict = dict(servers=dict(a=(2, 3))) expected_json = '{"servers":{"a":[2,3]}}' serializer = wsgi.JSONDictSerializer() result = serializer.serialize(input_dict) result = result.replace('\n', '').replace(' ', '') self.assertEqual(result, expected_json) class TextDeserializerTest(test.TestCase): def test_dispatch_default(self): deserializer = wsgi.TextDeserializer() self.assertEqual(deserializer.deserialize({}, 'update'), {}) class JSONDeserializerTest(test.TestCase): def test_json(self): data = """{"a": { "a1": "1", "a2": "2", "bs": ["1", "2", "3", {"c": {"c1": "1"}}], "d": {"e": "1"}, "f": "1"}}""" as_dict = { 'body': { 'a': { 'a1': '1', 'a2': '2', 'bs': ['1', '2', '3', {'c': {'c1': '1'}}], 'd': {'e': '1'}, 'f': '1', }, }, } deserializer = wsgi.JSONDeserializer() self.assertEqual(deserializer.deserialize(data), as_dict) def test_json_valid_utf8(self): data = """{"server": {"min_count": 1, "flavorRef": "1", "name": "\xe6\xa6\x82\xe5\xbf\xb5", "imageRef": "10bab10c-1304-47d", "max_count": 1}} """ as_dict = { 'body': { u'server': { u'min_count': 1, u'flavorRef': u'1', u'name': u'\u6982\u5ff5', u'imageRef': u'10bab10c-1304-47d', u'max_count': 1 } } } deserializer = wsgi.JSONDeserializer() self.assertEqual(deserializer.deserialize(data), as_dict) def test_json_invalid_utf8(self): """ Send invalid utf-8 to JSONDeserializer""" data = """{"server": {"min_count": 1, "flavorRef": "1", "name": "\xf0\x28\x8c\x28", "imageRef": "10bab10c-1304-47d", "max_count": 1}} """ deserializer = wsgi.JSONDeserializer() self.assertRaises(exception.MalformedRequestBody, deserializer.deserialize, data) class XMLDeserializerTest(test.TestCase): def test_xml(self): xml = """ 123 1 1 """.strip() as_dict = { 'body': { 'a': { 'a1': '1', 'a2': '2', 'bs': ['1', '2', '3', {'c': {'c1': '1'}}], 'd': {'e': '1'}, 'f': '1', }, }, } metadata = {'plurals': {'bs': 'b', 'ts': 't'}} deserializer = wsgi.XMLDeserializer(metadata=metadata) self.assertEqual(deserializer.deserialize(xml), as_dict) def test_xml_empty(self): xml = '' as_dict = {"body": {"a": {}}} deserializer = wsgi.XMLDeserializer() self.assertEqual(deserializer.deserialize(xml), as_dict) def test_xml_valid_utf8(self): xml = """ \xe6\xa6\x82\xe5\xbf\xb5 """ deserializer = wsgi.XMLDeserializer() as_dict = {'body': {u'a': {u'name': u'\u6982\u5ff5'}}} self.assertEqual(deserializer.deserialize(xml), as_dict) def test_xml_invalid_utf8(self): """ Send invalid utf-8 to XMLDeserializer""" xml = """ \xf0\x28\x8c\x28 """ deserializer = wsgi.XMLDeserializer() self.assertRaises(exception.MalformedRequestBody, deserializer.deserialize, xml) class ResourceTest(test.TestCase): def test_resource_call(self): class Controller(object): def index(self, req): return 'off' req = webob.Request.blank('/tests') app = fakes.TestRouter(Controller()) response = req.get_response(app) self.assertEqual(response.body, 'off') self.assertEqual(response.status_int, 200) def test_resource_not_authorized(self): class Controller(object): def index(self, req): raise exception.NotAuthorized() req = webob.Request.blank('/tests') app = fakes.TestRouter(Controller()) response = req.get_response(app) self.assertEqual(response.status_int, 403) def test_dispatch(self): class Controller(object): def index(self, req, pants=None): return pants controller = Controller() resource = wsgi.Resource(controller) method, extensions = resource.get_method(None, 'index', None, '') actual = resource.dispatch(method, None, {'pants': 'off'}) expected = 'off' self.assertEqual(actual, expected) def test_get_method_unknown_controller_method(self): class Controller(object): def index(self, req, pants=None): return pants controller = Controller() resource = wsgi.Resource(controller) self.assertRaises(AttributeError, resource.get_method, None, 'create', None, '') def test_get_method_action_json(self): class Controller(wsgi.Controller): @wsgi.action('fooAction') def _action_foo(self, req, id, body): return body controller = Controller() resource = wsgi.Resource(controller) method, extensions = resource.get_method(None, 'action', 'application/json', '{"fooAction": true}') self.assertEqual(controller._action_foo, method) def test_get_method_action_xml(self): class Controller(wsgi.Controller): @wsgi.action('fooAction') def _action_foo(self, req, id, body): return body controller = Controller() resource = wsgi.Resource(controller) method, extensions = resource.get_method(None, 'action', 'application/xml', 'true') self.assertEqual(controller._action_foo, method) def test_get_method_action_corrupt_xml(self): class Controller(wsgi.Controller): @wsgi.action('fooAction') def _action_foo(self, req, id, body): return body controller = Controller() resource = wsgi.Resource(controller) self.assertRaises( exception.MalformedRequestBody, resource.get_method, None, 'action', 'application/xml', utils.killer_xml_body()) def test_get_method_action_bad_body(self): class Controller(wsgi.Controller): @wsgi.action('fooAction') def _action_foo(self, req, id, body): return body controller = Controller() resource = wsgi.Resource(controller) self.assertRaises(exception.MalformedRequestBody, resource.get_method, None, 'action', 'application/json', '{}') def test_get_method_unknown_controller_action(self): class Controller(wsgi.Controller): @wsgi.action('fooAction') def _action_foo(self, req, id, body): return body controller = Controller() resource = wsgi.Resource(controller) self.assertRaises(KeyError, resource.get_method, None, 'action', 'application/json', '{"barAction": true}') def test_get_method_action_method(self): class Controller(): def action(self, req, pants=None): return pants controller = Controller() resource = wsgi.Resource(controller) method, extensions = resource.get_method(None, 'action', 'application/xml', 'true