Attempt to get working in python2 and 3 at the same time

Finding https://github.com/concordusapps/wsgi-intercept from
@concordusapps inspired me to revisit this.

With their work, plus the stuff I've recently done for tiddlyweb
I had enough gumption to move things along.

I've chosen not to use six as the extent of differences is
quite small and I prefer having the changes be quite visible.
This commit is contained in:
Chris Dent
2013-11-01 23:34:19 +00:00
parent 48c213bc9e
commit 02a14c981c
10 changed files with 163 additions and 115 deletions

View File

@@ -1,4 +1,11 @@
language: python
python:
- 2.6
- 2.7
- 3.2
- 3.3
script: make test
install:
- pip install --use-mirrors \
`python -c 'from setup import META; print(" ".join(META["extras_require"]["testing"]))'`

View File

@@ -1,15 +1,25 @@
from setuptools import setup, find_packages
setup(
name = 'wsgi_intercept',
version = '0.1',
author = 'Titus Brown, Kumar McMillan, Chris Dent',
author_email = 'cdent@peermore.com',
description = 'wsgi_intercept installs a WSGI application in place of a real URI for testing.',
META = {
'name': 'wsgi_intercept',
'version': '0.2',
'author': 'Titus Brown, Kumar McMillan, Chris Dent',
'author_email': 'cdent@peermore.com',
'description': 'wsgi_intercept installs a WSGI application in place of a real URI for testing.',
# What will the name be?
#url="http://pypi.python.org/pypi/wsgi_intercept",
long_description = open('README.md').read(),
license = 'MIT License',
packages = find_packages(),
)
'long_description': open('README.md').read(),
'license': 'MIT License',
'packages': find_packages(),
'extras_require': {
'testing': [
'pytest',
'httplib2',
'requests'
],
},
}
if __name__ == '__main__':
setup(**META)

View File

@@ -1,22 +1,22 @@
import pytest
import sys
from wsgi_intercept import http_client_intercept
from socket import gaierror
import wsgi_intercept
from test import wsgi_app
import http.client
_saved_debuglevel = None
try:
import http.client as http_lib
except ImportError:
import httplib as http_lib
def http_install(port=80):
_saved_debuglevel, wsgi_intercept.debuglevel = wsgi_intercept.debuglevel, 1
http_client_intercept.install()
wsgi_intercept.add_wsgi_intercept(
'some_hopefully_nonexistant_domain', port, wsgi_app.create_fn)
def http_uninstall(port=80):
wsgi_intercept.debuglevel = _saved_debuglevel
wsgi_intercept.remove_wsgi_intercept(
'some_hopefully_nonexistant_domain', port)
http_client_intercept.uninstall()
@@ -24,7 +24,7 @@ def http_uninstall(port=80):
def test_http_success():
http_install()
http_client = http.client.HTTPConnection(
http_client = http_lib.HTTPConnection(
'some_hopefully_nonexistant_domain')
http_client.request('GET', '/')
content = http_client.getresponse().read()
@@ -33,15 +33,17 @@ def test_http_success():
http_uninstall()
# https and http.client are not happy because of a recursion problem
# HTTPSConnection calls super in __init__
@pytest.mark.xfail
def test_https_success():
http_install(443)
http_client = http.client.HTTPSConnection(
'some_hopefully_nonexistant_domain')
http_client.request('GET', '/')
content = http_client.getresponse().read()
assert content == b'WSGI intercept successful!\n'
assert wsgi_app.success()
if sys.version_info[0] < 3:
http_client = http_lib.HTTPSConnection(
'some_hopefully_nonexistant_domain')
http_client.request('GET', '/')
content = http_client.getresponse().read()
assert content == b'WSGI intercept successful!\n'
assert wsgi_app.success()
else:
with pytest.raises(NotImplementedError):
http_client = http_lib.HTTPSConnection(
'some_hopefully_nonexistant_domain')
http_uninstall(443)

View File

@@ -7,11 +7,7 @@ import httplib2
import py.test
_saved_debuglevel = None
def install(port=80):
_saved_debuglevel, wsgi_intercept.debuglevel = wsgi_intercept.debuglevel, 1
httplib2_intercept.install()
wsgi_intercept.add_wsgi_intercept(
'some_hopefully_nonexistant_domain',
@@ -19,7 +15,6 @@ def install(port=80):
def uninstall():
wsgi_intercept.debuglevel = _saved_debuglevel
httplib2_intercept.uninstall()
@@ -35,7 +30,6 @@ def test_success():
def test_bogus_domain():
install()
wsgi_intercept.debuglevel = 1
py.test.raises(gaierror,
'httplib2_intercept.HTTP_WSGIInterceptorWithTimeout("_nonexistant_domain_").connect()')
uninstall()

View File

@@ -1,29 +1,27 @@
import pytest
import urllib.request
try:
import urllib.request as url_lib
except ImportError:
import urllib2 as url_lib
from wsgi_intercept import urllib_intercept
import wsgi_intercept
from test import wsgi_app
_saved_debuglevel = None
def add_http_intercept(port=80):
_saved_debuglevel, wsgi_intercept.debuglevel = wsgi_intercept.debuglevel, 1
wsgi_intercept.add_wsgi_intercept(
'some_hopefully_nonexistant_domain',
port, wsgi_app.create_fn)
def remove_intercept():
wsgi_intercept.debuglevel = _saved_debuglevel
wsgi_intercept.remove_wsgi_intercept()
def test_http():
add_http_intercept()
urllib_intercept.install_opener()
urllib.request.urlopen('http://some_hopefully_nonexistant_domain:80/')
url_lib.urlopen('http://some_hopefully_nonexistant_domain:80/')
assert wsgi_app.success()
remove_intercept()
@@ -31,7 +29,7 @@ def test_http():
def test_http_default_port():
add_http_intercept()
urllib_intercept.install_opener()
urllib.request.urlopen('http://some_hopefully_nonexistant_domain/')
url_lib.urlopen('http://some_hopefully_nonexistant_domain/')
assert wsgi_app.success()
remove_intercept()
@@ -39,7 +37,7 @@ def test_http_default_port():
def test_https():
add_http_intercept(443)
urllib_intercept.install_opener()
urllib.request.urlopen('https://some_hopefully_nonexistant_domain:443/')
url_lib.urlopen('https://some_hopefully_nonexistant_domain:443/')
assert wsgi_app.success()
remove_intercept()
@@ -47,6 +45,6 @@ def test_https():
def test_https_default_port():
add_http_intercept(443)
urllib_intercept.install_opener()
urllib.request.urlopen('https://some_hopefully_nonexistant_domain/')
url_lib.urlopen('https://some_hopefully_nonexistant_domain/')
assert wsgi_app.success()
remove_intercept()

View File

@@ -5,18 +5,13 @@ from test import wsgi_app
import httplib2
_saved_debuglevel = None
def http_install():
_saved_debuglevel, wsgi_intercept.debuglevel = wsgi_intercept.debuglevel, 1
install()
wsgi_intercept.add_wsgi_intercept(
'some_hopefully_nonexistant_domain', 80, wsgi_app.create_fn)
def http_uninstall():
wsgi_intercept.debuglevel = _saved_debuglevel
wsgi_intercept.remove_wsgi_intercept(
'some_hopefully_nonexistant_domain', 80)
uninstall()
@@ -77,6 +72,6 @@ def test_encoding_errors():
with py.test.raises(UnicodeEncodeError):
response, content = http.request(
'http://some_hopefully_nonexistant_domain/boom/baz', 'GET',
headers={'Accept': 'application/\u2603'})
headers={'Accept': u'application/\u2603'})
http_uninstall()

View File

@@ -4,6 +4,11 @@ Simple WSGI applications for testing.
from pprint import pformat
try:
bytes
except ImportError:
bytes = str
_app_was_hit = False
_internals = {}
@@ -46,4 +51,4 @@ def more_interesting_app(environ, start_response):
_internals = environ
start_response('200 OK', [('Content-type', 'text/plain')])
return [bytes(pformat(environ), encoding='utf-8')]
return [pformat(environ).encode('utf-8')]

View File

@@ -106,11 +106,22 @@ failing tests, et cetera using the Issue Tracker.
.. _GitHub: http://github.com/cdent/python3-wsgi-intercept
"""
from __future__ import print_function
__version__ = '0.0.1'
import sys
from http.client import HTTPConnection
from io import BytesIO
try:
from http.client import HTTPConnection, HTTPSConnection
except ImportError:
from httplib import HTTPConnection, HTTPSConnection
try:
from io import BytesIO
except ImportError:
from StringIO import StringIO as BytesIO
import traceback
debuglevel = 0
@@ -404,17 +415,18 @@ class wsgi_fake_socket:
# return the concatenated results.
return BytesIO(self.output.getvalue())
def sendall(self, str):
def sendall(self, content):
"""
Save all the traffic to self.inp.
"""
if debuglevel >= 2:
print(">>>", str, ">>>")
print(">>>", content, ">>>")
try:
self.inp.write(str)
except TypeError:
self.inp.write(bytes([str]).decode('utf-8'))
self.inp.write(content)
except TypeError as exc:
print('type error', exc)
self.inp.write(content.decode('utf-8'))
def close(self):
"Do nothing, for now."
@@ -474,53 +486,48 @@ class WSGI_HTTPConnection(HTTPConnection):
# WSGI_HTTPSConnection
#
try:
from http.client import HTTPSConnection
except ImportError:
pass
else:
class WSGI_HTTPSConnection(HTTPSConnection, WSGI_HTTPConnection):
class WSGI_HTTPSConnection(HTTPSConnection, WSGI_HTTPConnection):
"""
Intercept all traffic to certain hosts & redirect into a WSGI
application object.
"""
def get_app(self, host, port):
"""
Intercept all traffic to certain hosts & redirect into a WSGI
application object.
Return the app object for the given (host, port).
"""
def get_app(self, host, port):
"""
Return the app object for the given (host, port).
"""
key = (host, int(port))
key = (host, int(port))
app, script_name = None, None
app, script_name = None, None
if key in _wsgi_intercept:
(app_fn, script_name) = _wsgi_intercept[key]
app = app_fn()
if key in _wsgi_intercept:
(app_fn, script_name) = _wsgi_intercept[key]
app = app_fn()
return app, script_name
return app, script_name
def connect(self):
"""
Override the connect() function to intercept calls to certain
host/ports.
def connect(self):
"""
Override the connect() function to intercept calls to certain
host/ports.
If no app at host/port has been registered for interception then
a normal HTTPSConnection is made.
"""
if debuglevel:
sys.stderr.write('connect: %s, %s\n' % (self.host, self.port,))
If no app at host/port has been registered for interception then
a normal HTTPSConnection is made.
"""
if debuglevel:
sys.stderr.write('connect: %s, %s\n' % (self.host, self.port,))
try:
(app, script_name) = self.get_app(self.host, self.port)
if app:
if debuglevel:
sys.stderr.write('INTERCEPTING call to %s:%s\n' %
(self.host, self.port,))
self.sock = wsgi_fake_socket(app, self.host, self.port,
script_name)
else:
HTTPSConnection.connect(self)
try:
(app, script_name) = self.get_app(self.host, self.port)
if app:
if debuglevel:
sys.stderr.write('INTERCEPTING call to %s:%s\n' %
(self.host, self.port,))
self.sock = wsgi_fake_socket(app, self.host, self.port,
script_name)
else:
HTTPSConnection.connect(self)
except Exception as e:
if debuglevel: # intercept & print out tracebacks
traceback.print_exc()
raise
except Exception as e:
if debuglevel: # intercept & print out tracebacks
traceback.print_exc()
raise

View File

@@ -8,20 +8,42 @@
# XXX: HTTPSConnection is currently not allowed as attempting
# to override it causes a recursion error.
import http.client
import wsgi_intercept
import sys
from http.client import (
HTTPConnection as OriginalHTTPConnection,
#HTTPSConnection as OriginalHTTPSConnection
)
SKIP_SSL = False
try:
import http.client as http_lib
SKIP_SSL = True
except ImportError:
import httplib as http_lib
from . import WSGI_HTTPConnection, WSGI_HTTPSConnection
try:
from http.client import (
HTTPConnection as OriginalHTTPConnection,
HTTPSConnection as OriginalHTTPSConnection
)
except ImportError:
from httplib import (
HTTPConnection as OriginalHTTPConnection,
HTTPSConnection as OriginalHTTPSConnection
)
class Error_HTTPSConnection(object):
def __init__(self, *args, **kwargs):
raise NotImplementedError('HTTPS temporarily not implemented')
def install():
http.client.HTTPConnection = wsgi_intercept.WSGI_HTTPConnection
#http.client.HTTPSConnection = wsgi_intercept.WSGI_HTTPSConnection
http_lib.HTTPConnection = WSGI_HTTPConnection
if SKIP_SSL:
http_lib.HTTPSConnection = Error_HTTPSConnection
else:
http_lib.HTTPSConnection = WSGI_HTTPSConnection
def uninstall():
http.client.HTTPConnection = OriginalHTTPConnection
#http.client.HTTPSConnection = OriginalHTTPSConnection
http_lib.HTTPConnection = OriginalHTTPConnection
http_lib.HTTPSConnection = OriginalHTTPSConnection

View File

@@ -1,10 +1,18 @@
from wsgi_intercept import WSGI_HTTPConnection, WSGI_HTTPSConnection
import urllib.request
from urllib.request import HTTPHandler, HTTPSHandler
try:
import urllib.request as url_lib
except ImportError:
import urllib2 as url_lib
try:
from urllib.request import HTTPHandler, HTTPSHandler
except ImportError:
from urllib2 import HTTPHandler, HTTPSHandler
from . import WSGI_HTTPConnection, WSGI_HTTPSConnection
class WSGI_HTTPHandler(HTTPHandler):
class WSGI_HTTPHandler(url_lib.HTTPHandler):
"""
Override the default HTTPHandler class with one that uses the
WSGI_HTTPConnection class to open HTTP URLs.
@@ -13,7 +21,7 @@ class WSGI_HTTPHandler(HTTPHandler):
return self.do_open(WSGI_HTTPConnection, req)
class WSGI_HTTPSHandler(HTTPSHandler):
class WSGI_HTTPSHandler(url_lib.HTTPSHandler):
"""
Override the default HTTPSHandler class with one that uses the
WSGI_HTTPConnection class to open HTTPS URLs.
@@ -26,11 +34,11 @@ def install_opener():
handlers = [WSGI_HTTPHandler()]
if WSGI_HTTPSHandler is not None:
handlers.append(WSGI_HTTPSHandler())
opener = urllib.request.build_opener(*handlers)
urllib.request.install_opener(opener)
opener = url_lib.build_opener(*handlers)
url_lib.install_opener(opener)
return opener
def uninstall_opener():
urllib.request.install_opener(None)
url_lib.install_opener(None)