The TestResponse object doesn't do the right thing with regards to content vs text. Just reuse the one from requests_mock rather that try and fix it. Change-Id: Ia8bcae126babb0e616329928c57f875a50a957d6
		
			
				
	
	
		
			205 lines
		
	
	
		
			6.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			205 lines
		
	
	
		
			6.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
						|
#    not use this file except in compliance with the License. You may obtain
 | 
						|
#    a copy of the License at
 | 
						|
#
 | 
						|
#         http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
#    Unless required by applicable law or agreed to in writing, software
 | 
						|
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
						|
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
						|
#    License for the specific language governing permissions and limitations
 | 
						|
#    under the License.
 | 
						|
 | 
						|
import logging
 | 
						|
import sys
 | 
						|
import uuid
 | 
						|
 | 
						|
import fixtures
 | 
						|
from oslo_serialization import jsonutils
 | 
						|
import requests
 | 
						|
import requests_mock
 | 
						|
from requests_mock.contrib import fixture
 | 
						|
import six
 | 
						|
from six.moves.urllib import parse as urlparse
 | 
						|
import testscenarios
 | 
						|
import testtools
 | 
						|
 | 
						|
from keystoneclient.tests.unit import client_fixtures
 | 
						|
 | 
						|
 | 
						|
class TestCase(testtools.TestCase):
 | 
						|
 | 
						|
    TEST_DOMAIN_ID = uuid.uuid4().hex
 | 
						|
    TEST_DOMAIN_NAME = uuid.uuid4().hex
 | 
						|
    TEST_GROUP_ID = uuid.uuid4().hex
 | 
						|
    TEST_ROLE_ID = uuid.uuid4().hex
 | 
						|
    TEST_TENANT_ID = uuid.uuid4().hex
 | 
						|
    TEST_TENANT_NAME = uuid.uuid4().hex
 | 
						|
    TEST_TOKEN = uuid.uuid4().hex
 | 
						|
    TEST_TRUST_ID = uuid.uuid4().hex
 | 
						|
    TEST_USER = uuid.uuid4().hex
 | 
						|
    TEST_USER_ID = uuid.uuid4().hex
 | 
						|
 | 
						|
    TEST_ROOT_URL = 'http://127.0.0.1:5000/'
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        super(TestCase, self).setUp()
 | 
						|
        self.deprecations = self.useFixture(client_fixtures.Deprecations())
 | 
						|
 | 
						|
        self.logger = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
 | 
						|
        self.requests_mock = self.useFixture(fixture.Fixture())
 | 
						|
 | 
						|
    def stub_url(self, method, parts=None, base_url=None, json=None, **kwargs):
 | 
						|
        if not base_url:
 | 
						|
            base_url = self.TEST_URL
 | 
						|
 | 
						|
        if json:
 | 
						|
            kwargs['text'] = jsonutils.dumps(json)
 | 
						|
            headers = kwargs.setdefault('headers', {})
 | 
						|
            headers['Content-Type'] = 'application/json'
 | 
						|
 | 
						|
        if parts:
 | 
						|
            url = '/'.join([p.strip('/') for p in [base_url] + parts])
 | 
						|
        else:
 | 
						|
            url = base_url
 | 
						|
 | 
						|
        url = url.replace("/?", "?")
 | 
						|
        self.requests_mock.register_uri(method, url, **kwargs)
 | 
						|
 | 
						|
    def assertRequestBodyIs(self, body=None, json=None):
 | 
						|
        last_request_body = self.requests_mock.last_request.body
 | 
						|
        if json:
 | 
						|
            val = jsonutils.loads(last_request_body)
 | 
						|
            self.assertEqual(json, val)
 | 
						|
        elif body:
 | 
						|
            self.assertEqual(body, last_request_body)
 | 
						|
 | 
						|
    def assertQueryStringIs(self, qs=''):
 | 
						|
        """Verify the QueryString matches what is expected.
 | 
						|
 | 
						|
        The qs parameter should be of the format \'foo=bar&abc=xyz\'
 | 
						|
        """
 | 
						|
        expected = urlparse.parse_qs(qs, keep_blank_values=True)
 | 
						|
        parts = urlparse.urlparse(self.requests_mock.last_request.url)
 | 
						|
        querystring = urlparse.parse_qs(parts.query, keep_blank_values=True)
 | 
						|
        self.assertEqual(expected, querystring)
 | 
						|
 | 
						|
    def assertQueryStringContains(self, **kwargs):
 | 
						|
        """Verify the query string contains the expected parameters.
 | 
						|
 | 
						|
        This method is used to verify that the query string for the most recent
 | 
						|
        request made contains all the parameters provided as ``kwargs``, and
 | 
						|
        that the value of each parameter contains the value for the kwarg. If
 | 
						|
        the value for the kwarg is an empty string (''), then all that's
 | 
						|
        verified is that the parameter is present.
 | 
						|
 | 
						|
        """
 | 
						|
        parts = urlparse.urlparse(self.requests_mock.last_request.url)
 | 
						|
        qs = urlparse.parse_qs(parts.query, keep_blank_values=True)
 | 
						|
 | 
						|
        for k, v in six.iteritems(kwargs):
 | 
						|
            self.assertIn(k, qs)
 | 
						|
            self.assertIn(v, qs[k])
 | 
						|
 | 
						|
    def assertRequestHeaderEqual(self, name, val):
 | 
						|
        """Verify that the last request made contains a header and its value
 | 
						|
 | 
						|
        The request must have already been made.
 | 
						|
        """
 | 
						|
        headers = self.requests_mock.last_request.headers
 | 
						|
        self.assertEqual(headers.get(name), val)
 | 
						|
 | 
						|
 | 
						|
if tuple(sys.version_info)[0:2] < (2, 7):
 | 
						|
 | 
						|
    def assertDictEqual(self, d1, d2, msg=None):
 | 
						|
        # Simple version taken from 2.7
 | 
						|
        self.assertIsInstance(d1, dict,
 | 
						|
                              'First argument is not a dictionary')
 | 
						|
        self.assertIsInstance(d2, dict,
 | 
						|
                              'Second argument is not a dictionary')
 | 
						|
        if d1 != d2:
 | 
						|
            if msg:
 | 
						|
                self.fail(msg)
 | 
						|
            else:
 | 
						|
                standardMsg = '%r != %r' % (d1, d2)
 | 
						|
                self.fail(standardMsg)
 | 
						|
 | 
						|
    TestCase.assertDictEqual = assertDictEqual
 | 
						|
 | 
						|
 | 
						|
def test_response(**kwargs):
 | 
						|
    r = requests.Request(method='GET', url='http://localhost:5000').prepare()
 | 
						|
    return requests_mock.create_response(r, **kwargs)
 | 
						|
 | 
						|
 | 
						|
class DisableModuleFixture(fixtures.Fixture):
 | 
						|
    """A fixture to provide support for unloading/disabling modules."""
 | 
						|
 | 
						|
    def __init__(self, module, *args, **kw):
 | 
						|
        super(DisableModuleFixture, self).__init__(*args, **kw)
 | 
						|
        self.module = module
 | 
						|
        self._finders = []
 | 
						|
        self._cleared_modules = {}
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        super(DisableModuleFixture, self).tearDown()
 | 
						|
        for finder in self._finders:
 | 
						|
            sys.meta_path.remove(finder)
 | 
						|
        sys.modules.update(self._cleared_modules)
 | 
						|
 | 
						|
    def clear_module(self):
 | 
						|
        cleared_modules = {}
 | 
						|
        for fullname in list(sys.modules):
 | 
						|
            if (fullname == self.module or
 | 
						|
                    fullname.startswith(self.module + '.')):
 | 
						|
                cleared_modules[fullname] = sys.modules.pop(fullname)
 | 
						|
        return cleared_modules
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        """Ensure ImportError for the specified module."""
 | 
						|
 | 
						|
        super(DisableModuleFixture, self).setUp()
 | 
						|
 | 
						|
        # Clear 'module' references in sys.modules
 | 
						|
        self._cleared_modules.update(self.clear_module())
 | 
						|
 | 
						|
        finder = NoModuleFinder(self.module)
 | 
						|
        self._finders.append(finder)
 | 
						|
        sys.meta_path.insert(0, finder)
 | 
						|
 | 
						|
 | 
						|
class ClientTestCaseMixin(testscenarios.WithScenarios):
 | 
						|
 | 
						|
    client_fixture_class = None
 | 
						|
    data_fixture_class = None
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        super(ClientTestCaseMixin, self).setUp()
 | 
						|
 | 
						|
        self.data_fixture = None
 | 
						|
        self.client_fixture = None
 | 
						|
        self.client = None
 | 
						|
 | 
						|
        if self.client_fixture_class:
 | 
						|
            fix = self.client_fixture_class(self.requests_mock,
 | 
						|
                                            self.deprecations)
 | 
						|
            self.client_fixture = self.useFixture(fix)
 | 
						|
            self.client = self.client_fixture.client
 | 
						|
            self.TEST_USER_ID = self.client_fixture.user_id
 | 
						|
 | 
						|
        if self.data_fixture_class:
 | 
						|
            fix = self.data_fixture_class(self.requests_mock)
 | 
						|
            self.data_fixture = self.useFixture(fix)
 | 
						|
 | 
						|
 | 
						|
class NoModuleFinder(object):
 | 
						|
    """Disallow further imports of 'module'."""
 | 
						|
 | 
						|
    def __init__(self, module):
 | 
						|
        self.module = module
 | 
						|
 | 
						|
    def find_module(self, fullname, path):
 | 
						|
        if fullname == self.module or fullname.startswith(self.module + '.'):
 | 
						|
            raise ImportError
 |