86fb33d13d
When creating a response we type check that the user has passed the correct type to content and text, however we check this based on whether the value is True and not whether it has been passed at all which means that the empty string of the wrong type can incorrectly pass through this check. Fix this check to test the empty string. Anyone relying on this passing will be caught with a more confusing TypeError later and so this should be backwards compatible. Change-Id: I826da9b7fd83bb88af50e4a96a5e6358ee35e4b2 Closes-Bug: #1647880
248 lines
9.8 KiB
Python
248 lines
9.8 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import json as jsonutils
|
|
|
|
from requests.adapters import HTTPAdapter
|
|
from requests.cookies import MockRequest, MockResponse
|
|
from requests.cookies import RequestsCookieJar
|
|
from requests.cookies import merge_cookies, cookiejar_from_dict
|
|
from requests.packages.urllib3.response import HTTPResponse
|
|
import six
|
|
|
|
from requests_mock import compat
|
|
from requests_mock import exceptions
|
|
|
|
_BODY_ARGS = frozenset(['raw', 'body', 'content', 'text', 'json'])
|
|
_HTTP_ARGS = frozenset(['status_code', 'reason', 'headers', 'cookies'])
|
|
|
|
_DEFAULT_STATUS = 200
|
|
_http_adapter = HTTPAdapter()
|
|
|
|
|
|
class CookieJar(RequestsCookieJar):
|
|
|
|
def set(self, name, value, **kwargs):
|
|
"""Add a cookie to the Jar.
|
|
|
|
:param str name: cookie name/key.
|
|
:param str value: cookie value.
|
|
:param int version: Integer or None. Netscape cookies have version 0.
|
|
RFC 2965 and RFC 2109 cookies have a version cookie-attribute of 1.
|
|
However, note that cookielib may 'downgrade' RFC 2109 cookies to
|
|
Netscape cookies, in which case version is 0.
|
|
:param str port: String representing a port or a set of ports
|
|
(eg. '80', or '80,8080'),
|
|
:param str domain: The domain the cookie should apply to.
|
|
:param str path: Cookie path (a string, eg. '/acme/rocket_launchers').
|
|
:param bool secure: True if cookie should only be returned over a
|
|
secure connection.
|
|
:param int expires: Integer expiry date in seconds since epoch or None.
|
|
:param bool discard: True if this is a session cookie.
|
|
:param str comment: String comment from the server explaining the
|
|
function of this cookie.
|
|
:param str comment_url: URL linking to a comment from the server
|
|
explaining the function of this cookie.
|
|
"""
|
|
# just here to provide the function documentation
|
|
return super(CookieJar, self).set(name, value, **kwargs)
|
|
|
|
|
|
def _check_body_arguments(**kwargs):
|
|
# mutual exclusion, only 1 body method may be provided
|
|
provided = [x for x in _BODY_ARGS if kwargs.pop(x, None) is not None]
|
|
|
|
if len(provided) > 1:
|
|
raise RuntimeError('You may only supply one body element. You '
|
|
'supplied %s' % ', '.join(provided))
|
|
|
|
extra = [x for x in kwargs if x not in _HTTP_ARGS]
|
|
|
|
if extra:
|
|
raise TypeError('Too many arguments provided. Unexpected '
|
|
'arguments %s.' % ', '.join(extra))
|
|
|
|
|
|
class _FakeConnection(object):
|
|
"""An object that can mock the necessary parts of a socket interface."""
|
|
|
|
def send(self, request, **kwargs):
|
|
msg = 'This response was created without a connection. You are ' \
|
|
'therefore unable to make a request directly on that connection.'
|
|
raise exceptions.InvalidRequest(msg)
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
|
|
def _extract_cookies(request, response, cookies):
|
|
"""Add cookies to the response.
|
|
|
|
Cookies in requests are extracted from the headers in the original_response
|
|
httplib.HTTPMessage which we don't create so we have to do this step
|
|
manually.
|
|
"""
|
|
# This will add cookies set manually via the Set-Cookie or Set-Cookie2
|
|
# header but this only allows 1 cookie to be set.
|
|
http_message = compat._FakeHTTPMessage(response.headers)
|
|
response.cookies.extract_cookies(MockResponse(http_message),
|
|
MockRequest(request))
|
|
|
|
# This allows you to pass either a CookieJar or a dictionary to request_uri
|
|
# or directly to create_response. To allow more than one cookie to be set.
|
|
if cookies:
|
|
merge_cookies(response.cookies, cookies)
|
|
|
|
|
|
class _IOReader(six.BytesIO):
|
|
"""A reader that makes a BytesIO look like a HTTPResponse.
|
|
|
|
A HTTPResponse will return an empty string when you read from it after
|
|
the socket has been closed. A BytesIO will raise a ValueError. For
|
|
compatibility we want to do the same thing a HTTPResponse does.
|
|
"""
|
|
|
|
def read(self, *args, **kwargs):
|
|
if self.closed:
|
|
return six.b('')
|
|
|
|
# not a new style object in python 2
|
|
return six.BytesIO.read(self, *args, **kwargs)
|
|
|
|
|
|
def create_response(request, **kwargs):
|
|
"""
|
|
:param int status_code: The status code to return upon a successful
|
|
match. Defaults to 200.
|
|
:param HTTPResponse raw: A HTTPResponse object to return upon a
|
|
successful match.
|
|
:param io.IOBase body: An IO object with a read() method that can
|
|
return a body on successful match.
|
|
:param bytes content: A byte string to return upon a successful match.
|
|
:param unicode text: A text string to return upon a successful match.
|
|
:param object json: A python object to be converted to a JSON string
|
|
and returned upon a successful match.
|
|
:param dict headers: A dictionary object containing headers that are
|
|
returned upon a successful match.
|
|
:param CookieJar cookies: A cookie jar with cookies to set on the
|
|
response.
|
|
"""
|
|
connection = kwargs.pop('connection', _FakeConnection())
|
|
|
|
_check_body_arguments(**kwargs)
|
|
|
|
raw = kwargs.pop('raw', None)
|
|
body = kwargs.pop('body', None)
|
|
content = kwargs.pop('content', None)
|
|
text = kwargs.pop('text', None)
|
|
json = kwargs.pop('json', None)
|
|
encoding = None
|
|
|
|
if content is not None and not isinstance(content, six.binary_type):
|
|
raise TypeError('Content should be binary data')
|
|
if text is not None and not isinstance(text, six.string_types):
|
|
raise TypeError('Text should be string data')
|
|
|
|
if json is not None:
|
|
text = jsonutils.dumps(json)
|
|
if text is not None:
|
|
encoding = 'utf-8'
|
|
content = text.encode(encoding)
|
|
if content is not None:
|
|
body = _IOReader(content)
|
|
if not raw:
|
|
raw = HTTPResponse(status=kwargs.get('status_code', _DEFAULT_STATUS),
|
|
headers=kwargs.get('headers', {}),
|
|
reason=kwargs.get('reason'),
|
|
body=body or _IOReader(six.b('')),
|
|
decode_content=False,
|
|
preload_content=False,
|
|
original_response=compat._fake_http_response)
|
|
|
|
response = _http_adapter.build_response(request, raw)
|
|
response.connection = connection
|
|
response.encoding = encoding
|
|
|
|
_extract_cookies(request, response, kwargs.get('cookies'))
|
|
|
|
return response
|
|
|
|
|
|
class _Context(object):
|
|
"""Stores the data being used to process a current URL match."""
|
|
|
|
def __init__(self, headers, status_code, reason, cookies):
|
|
self.headers = headers
|
|
self.status_code = status_code
|
|
self.reason = reason
|
|
self.cookies = cookies
|
|
|
|
|
|
class _MatcherResponse(object):
|
|
|
|
def __init__(self, **kwargs):
|
|
self._exc = kwargs.pop('exc', None)
|
|
|
|
# If the user is asking for an exception to be thrown then prevent them
|
|
# specifying any sort of body or status response as it won't be used.
|
|
# This may be protecting the user too much but can be removed later.
|
|
if self._exc and kwargs:
|
|
raise TypeError('Cannot provide other arguments with exc.')
|
|
|
|
_check_body_arguments(**kwargs)
|
|
self._params = kwargs
|
|
|
|
# whilst in general you shouldn't do type checking in python this
|
|
# makes sure we don't end up with differences between the way types
|
|
# are handled between python 2 and 3.
|
|
content = self._params.get('content')
|
|
text = self._params.get('text')
|
|
|
|
if content is not None and not (callable(content) or
|
|
isinstance(content, six.binary_type)):
|
|
raise TypeError('Content should be a callback or binary data')
|
|
|
|
if text is not None and not (callable(text) or
|
|
isinstance(text, six.string_types)):
|
|
raise TypeError('Text should be a callback or string data')
|
|
|
|
def get_response(self, request):
|
|
# if an error was requested then raise that instead of doing response
|
|
if self._exc:
|
|
raise self._exc
|
|
|
|
# If a cookie dict is passed convert it into a CookieJar so that the
|
|
# cookies object available in a callback context is always a jar.
|
|
cookies = self._params.get('cookies', CookieJar())
|
|
if isinstance(cookies, dict):
|
|
cookies = cookiejar_from_dict(cookies, CookieJar())
|
|
|
|
context = _Context(self._params.get('headers', {}).copy(),
|
|
self._params.get('status_code', _DEFAULT_STATUS),
|
|
self._params.get('reason'),
|
|
cookies)
|
|
|
|
# if a body element is a callback then execute it
|
|
def _call(f, *args, **kwargs):
|
|
return f(request, context, *args, **kwargs) if callable(f) else f
|
|
|
|
return create_response(request,
|
|
json=_call(self._params.get('json')),
|
|
text=_call(self._params.get('text')),
|
|
content=_call(self._params.get('content')),
|
|
body=_call(self._params.get('body')),
|
|
raw=self._params.get('raw'),
|
|
status_code=context.status_code,
|
|
reason=context.reason,
|
|
headers=context.headers,
|
|
cookies=context.cookies)
|