Files
deb-python-gabbi/gabbi/tests/simple_wsgi.py
Chris Dent 00c119241b Experiment with implementation of a $COOKIE
$COOKIE is replaced by the cookie key and value pairs parsed out of
the set-cookie headers from the previous request.

This was straightforward to do and matches the behavior in
http://sandbox.tiddlyspace.com/_/979d0866-8959-4fed-9b3b-403002c20aa2
but for some reason it doesn't feel right. Needs more thought.
2016-05-15 18:56:09 +01:00

130 lines
4.7 KiB
Python

#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
SimpleWsgi provides a WSGI callable that can be used in tests to
reflect posted data and otherwise confirm headers and queries.
"""
import json
from six.moves.urllib import parse as urlparse
CURRENT_POLL = 0
METHODS = ['GET', 'PUT', 'POST', 'DELETE', 'PATCH']
class SimpleWsgi(object):
"""A simple wsgi application to use in tests."""
def __call__(self, environ, start_response):
global METHODS
global CURRENT_POLL
request_method = environ['REQUEST_METHOD'].upper()
query_data = urlparse.parse_qs(environ.get('QUERY_STRING', ''))
request_url = environ.get('REQUEST_URI',
environ.get('RAW_URI', 'unknown'))
path_info = environ.get('PATH_INFO', '')
accept_header = environ.get('HTTP_ACCEPT')
content_type_header = environ.get('CONTENT_TYPE', '')
full_request_url = self._fully_qualify(environ, request_url)
if accept_header:
response_content_type = accept_header
else:
# JSON doesn't need a charset but we throw one in here
# to exercise the decoding code
response_content_type = (
'application/json ; charset=utf-8 ; stop=no')
headers = [
('X-Gabbi-method', request_method),
('Content-Type', response_content_type),
('X-Gabbi-url', full_request_url),
]
if request_method == 'DIE':
raise Exception('because you asked me to')
if request_method not in METHODS:
headers.append(
('Allow', ', '.join(METHODS)))
start_response('405 Method Not Allowed', headers)
return []
if request_method.startswith('P'):
body = environ['wsgi.input'].read()
if body:
if not content_type_header:
start_response('400 Bad request', headers)
return []
if content_type_header == 'application/json':
body_data = json.loads(body.decode('utf-8'))
if query_data:
query_data.update(body_data)
else:
query_data = body_data
headers.append(('Location', full_request_url))
if path_info == '/presenter':
start_response('200 OK', [('Content-Type', 'text/html')])
return [b"""<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Hello World</title>
</head>
<body>
<h1>Hello World</h1>
<p>lorem ipsum dolor sit amet</p>
</body>
</html>
"""]
elif path_info.startswith('/poller'):
if CURRENT_POLL == 0:
CURRENT_POLL = int(query_data.get('count', [5])[0])
start_response('400 Bad Reqest', [])
return []
else:
CURRENT_POLL -= 1
if CURRENT_POLL > 0:
start_response('400 Bad Reqest', [])
return []
else:
CURRENT_POLL = 0
# fall through if we've ended the loop
elif path_info == '/cookie':
headers.append(('Set-Cookie', 'session=1234; domain=.example.com'))
start_response('200 OK', headers)
query_output = json.dumps(query_data)
return [query_output.encode('utf-8')]
@staticmethod
def _fully_qualify(environ, url):
"""Turn a URL path into a fully qualified URL."""
split_url = urlparse.urlsplit(url)
server_name = environ.get('SERVER_NAME')
server_port = str(environ.get('SERVER_PORT'))
server_scheme = environ.get('wsgi.url_scheme')
if server_port not in ['80', '443']:
netloc = '%s:%s' % (server_name, server_port)
else:
netloc = server_name
return urlparse.urlunsplit((server_scheme, netloc, split_url.path,
split_url.query, split_url.fragment))