Barebones of initial starting point

None of this works, this just imports the old code, leaving
behind those parts not deemed immediately relevant.
This commit is contained in:
Chris Dent
2012-10-02 15:55:06 +01:00
parent f5af98fa09
commit 12d4ef8288
15 changed files with 1039 additions and 0 deletions

9
Makefile Normal file
View File

@@ -0,0 +1,9 @@
clean:
find wsgi_intercept -name "*.pyc" |xargs rm || true
rm -r dist || true
rm -r build || true
rm -r wsgi_intercept.egg-info || true
rm *.bundle || true
rm -r *-bundle* || true

10
docs/README.txt Normal file
View File

@@ -0,0 +1,10 @@
Build docs as HTML with::
python setup.py build_docs
To publish docs to stdout in Google Code wiki format::
python setup.py publish_docs --google-user=x --google-password=x
Just use literally "x" for user / pass since logging in and publishing is not implemented. Yea!

8
docs/index.rst Normal file
View File

@@ -0,0 +1,8 @@
===============================================================================
wsgi_intercept: installs a WSGI application in place of a real URI for testing.
===============================================================================
.. contents::
.. include_docstring:: ./wsgi_intercept/__init__.py

49
setup.py Normal file
View File

@@ -0,0 +1,49 @@
from setuptools import setup, find_packages
import compiler, pydoc
from compiler import visitor
class ModuleVisitor(object):
def __init__(self):
self.mod_doc = None
self.mod_version = None
def default(self, node):
for child in node.getChildNodes():
self.visit(child)
def visitModule(self, node):
self.mod_doc = node.doc
self.default(node)
def visitAssign(self, node):
if self.mod_version:
return
asn = node.nodes[0]
assert asn.name == '__version__', (
"expected __version__ node: %s" % asn)
self.mod_version = node.expr.value
self.default(node)
def get_module_meta(modfile):
ast = compiler.parseFile(modfile)
modnode = ModuleVisitor()
visitor.walk(ast, modnode)
if modnode.mod_doc is None:
raise RuntimeError(
"could not parse doc string from %s" % modfile)
if modnode.mod_version is None:
raise RuntimeError(
"could not parse __version__ from %s" % modfile)
return (modnode.mod_version,) + pydoc.splitdoc(modnode.mod_doc)
version, description, long_description = get_module_meta("./wsgi_intercept/__init__.py")
setup(
name = 'wsgi_intercept',
version = version,
author = 'Titus Brown, Kumar McMillan, Chris Dent',
author_email = 'cdent@peermore.com',
description = description,
url="http://pypi.python.org/pypi/wsgi_intercept",
long_description = long_description,
license = 'MIT License',
packages = find_packages(),
)

2
test/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
import warnings
warnings.simplefilter('error')

48
test/test_httplib.py Normal file
View File

@@ -0,0 +1,48 @@
#! /usr/bin/env python2.4
from nose.tools import with_setup, raises, eq_
from wsgi_intercept import httplib_intercept
from socket import gaierror
import wsgi_intercept
from wsgi_intercept import test_wsgi_app
import httplib
_saved_debuglevel = None
def http_install():
_saved_debuglevel, wsgi_intercept.debuglevel = wsgi_intercept.debuglevel, 1
httplib_intercept.install()
wsgi_intercept.add_wsgi_intercept('some_hopefully_nonexistant_domain', 80, test_wsgi_app.create_fn)
def http_uninstall():
wsgi_intercept.debuglevel = _saved_debuglevel
wsgi_intercept.remove_wsgi_intercept('some_hopefully_nonexistant_domain', 80)
httplib_intercept.uninstall()
@with_setup(http_install, http_uninstall)
def test_http_success():
http = httplib.HTTPConnection('some_hopefully_nonexistant_domain')
http.request('GET', '/')
content = http.getresponse().read()
eq_(content, 'WSGI intercept successful!\n')
assert test_wsgi_app.success()
def https_install():
_saved_debuglevel, wsgi_intercept.debuglevel = wsgi_intercept.debuglevel, 1
httplib_intercept.install()
wsgi_intercept.add_wsgi_intercept('some_hopefully_nonexistant_domain', 443, test_wsgi_app.create_fn)
def https_uninstall():
wsgi_intercept.debuglevel = _saved_debuglevel
wsgi_intercept.remove_wsgi_intercept('some_hopefully_nonexistant_domain', 443)
httplib_intercept.uninstall()
@with_setup(https_install, https_uninstall)
def test_https_success():
http = httplib.HTTPSConnection('some_hopefully_nonexistant_domain')
http.request('GET', '/')
content = http.getresponse().read()
eq_(content, 'WSGI intercept successful!\n')
assert test_wsgi_app.success()

38
test/test_httplib2.py Normal file
View File

@@ -0,0 +1,38 @@
#! /usr/bin/env python2.4
from wsgi_intercept import httplib2_intercept
from nose.tools import with_setup, raises, eq_
from socket import gaierror
import wsgi_intercept
from wsgi_intercept import test_wsgi_app
import httplib2
_saved_debuglevel = None
def install(port=80):
_saved_debuglevel, wsgi_intercept.debuglevel = wsgi_intercept.debuglevel, 1
httplib2_intercept.install()
wsgi_intercept.add_wsgi_intercept('some_hopefully_nonexistant_domain', port, test_wsgi_app.create_fn)
def uninstall():
wsgi_intercept.debuglevel = _saved_debuglevel
httplib2_intercept.uninstall()
@with_setup(install, uninstall)
def test_success():
http = httplib2.Http()
resp, content = http.request('http://some_hopefully_nonexistant_domain:80/', 'GET')
eq_(content, "WSGI intercept successful!\n")
assert test_wsgi_app.success()
@with_setup(install, uninstall)
@raises(gaierror)
def test_bogus_domain():
wsgi_intercept.debuglevel = 1;
httplib2_intercept.HTTP_WSGIInterceptorWithTimeout("_nonexistant_domain_").connect()
@with_setup(lambda: install(443), uninstall)
def test_https_success():
http = httplib2.Http()
resp, content = http.request('https://some_hopefully_nonexistant_domain/', 'GET')
assert test_wsgi_app.success()

View File

@@ -0,0 +1,49 @@
#! /usr/bin/env python2.4
from wsgi_intercept.httplib2_intercept import install, uninstall
import wsgi_intercept
import test.wsgi_app
import httplib2
_saved_debuglevel = None
def setup():
warnings.simplefilter("error")
_saved_debuglevel, wsgi_intercept.debuglevel = wsgi_intercept.debuglevel, 1
install()
wsgi_intercept.add_wsgi_intercept('some_hopefully_nonexistant_domain', 80, prudent_wsgi_app)
def test():
http = httplib2.Http()
resp, content = http.request('http://some_hopefully_nonexistant_domain:80/', 'GET')
assert test_wsgi_app.success()
def test_quoting_issue11():
# see http://code.google.com/p/wsgi-intercept/issues/detail?id=11
http = httplib2.Http()
inspected_env = {}
def make_path_checking_app():
def path_checking_app(environ, start_response):
inspected_env ['QUERY_STRING'] = environ['QUERY_STRING']
inspected_env ['PATH_INFO'] = environ['PATH_INFO']
status = '200 OK'
response_headers = [('Content-type','text/plain')]
start_response(status, response_headers)
return []
return path_checking_app
wsgi_intercept.add_wsgi_intercept('some_hopefully_nonexistant_domain', 80, make_path_checking_app)
resp, content = http.request('http://some_hopefully_nonexistant_domain:80/spaced+words.html?word=something%20spaced', 'GET')
assert ('QUERY_STRING' in inspected_env and 'PATH_INFO' in inspected_env), "path_checking_app() was never called?"
eq_(inspected_env['PATH_INFO'], '/spaced+words.html')
eq_(inspected_env['QUERY_STRING'], 'word=something%20spaced')
def teardown():
warnings.resetwarnings()
wsgi_intercept.debuglevel = _saved_debuglevel
uninstall()
if __name__ == '__main__':
setup()
try:
test()
finally:
teardown()

45
test/test_wsgi_urllib2.py Normal file
View File

@@ -0,0 +1,45 @@
#! /usr/bin/env python
import sys, os.path
from nose.tools import with_setup
import urllib2
from wsgi_intercept import urllib2_intercept
import wsgi_intercept
from wsgi_intercept import test_wsgi_app
_saved_debuglevel = None
def add_http_intercept():
_saved_debuglevel, wsgi_intercept.debuglevel = wsgi_intercept.debuglevel, 1
wsgi_intercept.add_wsgi_intercept('some_hopefully_nonexistant_domain', 80, test_wsgi_app.create_fn)
def add_https_intercept():
_saved_debuglevel, wsgi_intercept.debuglevel = wsgi_intercept.debuglevel, 1
wsgi_intercept.add_wsgi_intercept('some_hopefully_nonexistant_domain', 443, test_wsgi_app.create_fn)
def remove_intercept():
wsgi_intercept.debuglevel = _saved_debuglevel
wsgi_intercept.remove_wsgi_intercept()
@with_setup(add_http_intercept, remove_intercept)
def test_http():
urllib2_intercept.install_opener()
urllib2.urlopen('http://some_hopefully_nonexistant_domain:80/')
assert test_wsgi_app.success()
@with_setup(add_http_intercept, remove_intercept)
def test_http_default_port():
urllib2_intercept.install_opener()
urllib2.urlopen('http://some_hopefully_nonexistant_domain/')
assert test_wsgi_app.success()
@with_setup(add_https_intercept, remove_intercept)
def test_https():
urllib2_intercept.install_opener()
urllib2.urlopen('https://some_hopefully_nonexistant_domain:443/')
assert test_wsgi_app.success()
@with_setup(add_https_intercept, remove_intercept)
def test_https_default_port():
urllib2_intercept.install_opener()
urllib2.urlopen('https://some_hopefully_nonexistant_domain/')
assert test_wsgi_app.success()

24
test/wsgi_app.py Normal file
View File

@@ -0,0 +1,24 @@
"""
A simple WSGI application for testing.
"""
_app_was_hit = False
def success():
return _app_was_hit
def simple_app(environ, start_response):
"""Simplest possible application object"""
status = '200 OK'
response_headers = [('Content-type','text/plain')]
start_response(status, response_headers)
global _app_was_hit
_app_was_hit = True
return ['WSGI intercept successful!\n']
def create_fn():
global _app_was_hit
_app_was_hit = False
return simple_app

617
wsgi_intercept/__init__.py Normal file
View File

@@ -0,0 +1,617 @@
"""installs a WSGI application in place of a real URI for testing.
Introduction
============
Testing a WSGI application normally involves starting a server at a local host and port, then pointing your test code to that address. Instead, this library lets you intercept calls to any specific host/port combination and redirect them into a `WSGI application`_ importable by your test program. Thus, you can avoid spawning multiple processes or threads to test your Web app.
How Does It Work?
=================
``wsgi_intercept`` works by replacing ``httplib.HTTPConnection`` with a subclass, ``wsgi_intercept.WSGI_HTTPConnection``. This class then redirects specific server/port combinations into a WSGI application by emulating a socket. If no intercept is registered for the host and port requested, those requests are passed on to the standard handler.
The functions ``add_wsgi_intercept(host, port, app_create_fn, script_name='')`` and ``remove_wsgi_intercept(host,port)`` specify which URLs should be redirect into what applications. Note especially that ``app_create_fn`` is a *function object* returning a WSGI application; ``script_name`` becomes ``SCRIPT_NAME`` in the WSGI app's environment, if set.
Install
=======
::
easy_install wsgi_intercept
(The ``easy_install`` command is bundled with the setuptools_ module)
To use a `development version`_ of wsgi_intercept, run::
easy_install http://wsgi-intercept.googlecode.com/svn/trunk
.. _setuptools: http://cheeseshop.python.org/pypi/setuptools/
.. _development version: http://wsgi-intercept.googlecode.com/svn/trunk/#egg=wsgi_intercept-dev
Packages Intercepted
====================
Unfortunately each of the Web testing frameworks uses its own specific mechanism for making HTTP call-outs, so individual implementations are needed. Below are the packages supported and how to create an intercept.
urllib2
-------
urllib2_ is a standard Python module, and ``urllib2.urlopen`` is a pretty
normal way to open URLs.
The following code will install the WSGI intercept stuff as a default
urllib2 handler: ::
>>> from wsgi_intercept.urllib2_intercept import install_opener
>>> install_opener() #doctest: +ELLIPSIS
<urllib2.OpenerDirector instance at ...>
>>> import wsgi_intercept
>>> from wsgi_intercept.test_wsgi_app import create_fn
>>> wsgi_intercept.add_wsgi_intercept('some_host', 80, create_fn)
>>> import urllib2
>>> urllib2.urlopen('http://some_host:80/').read()
'WSGI intercept successful!\\n'
The only tricky bit in there is that different handler classes need to
be constructed for Python 2.3 and Python 2.4, because the httplib
interface changed between those versions.
.. _urllib2: http://docs.python.org/lib/module-urllib2.html
httplib2
--------
httplib2_ is a 3rd party extension of the built-in ``httplib``. To intercept
requests, it is similar to urllib2::
>>> from wsgi_intercept.httplib2_intercept import install
>>> install()
>>> import wsgi_intercept
>>> from wsgi_intercept.test_wsgi_app import create_fn
>>> wsgi_intercept.add_wsgi_intercept('some_host', 80, create_fn)
>>> import httplib2
>>> resp, content = httplib2.Http().request('http://some_host:80/', 'GET')
>>> content
'WSGI intercept successful!\\n'
(Contributed by `David "Whit" Morris`_.)
.. _httplib2: http://code.google.com/p/httplib2/
.. _David "Whit" Morris: http://public.xdi.org/=whit
webtest
-------
webtest_ is an extension to ``unittest`` that has some nice functions for
testing Web sites.
To install the WSGI intercept handler, do ::
>>> import wsgi_intercept.webtest_intercept
>>> class WSGI_Test(wsgi_intercept.webtest_intercept.WebCase):
... HTTP_CONN = wsgi_intercept.WSGI_HTTPConnection
... HOST='localhost'
... PORT=80
...
... def setUp(self):
... wsgi_intercept.add_wsgi_intercept(self.HOST, self.PORT, create_fn)
...
>>>
.. _webtest: http://www.cherrypy.org/file/trunk/cherrypy/test/webtest.py
webunit
-------
webunit_ is another unittest-like framework that contains nice functions
for Web testing. (funkload_ uses webunit, too.)
webunit needed to be patched to support different scheme handlers.
The patched package is in webunit/wsgi_webunit/, and the only
file that was changed was webunittest.py; the original is in
webunittest-orig.py.
To install the WSGI intercept handler, do ::
>>> from httplib import HTTP
>>> import wsgi_intercept.webunit_intercept
>>> class WSGI_HTTP(HTTP):
... _connection_class = wsgi_intercept.WSGI_HTTPConnection
...
>>> class WSGI_WebTestCase(wsgi_intercept.webunit_intercept.WebTestCase):
... scheme_handlers = dict(http=WSGI_HTTP)
...
... def setUp(self):
... wsgi_intercept.add_wsgi_intercept('127.0.0.1', 80, create_fn)
...
>>>
.. _webunit: http://mechanicalcat.net/tech/webunit/
mechanize
---------
mechanize_ is John J. Lee's port of Perl's WWW::Mechanize to Python.
It mimics a browser. (It's also what's behind twill_.)
>>> import wsgi_intercept.mechanize_intercept
>>> from wsgi_intercept.test_wsgi_app import create_fn
>>> wsgi_intercept.add_wsgi_intercept('some_host', 80, create_fn)
>>> b = wsgi_intercept.mechanize_intercept.Browser()
>>> response = b.open('http://some_host:80')
>>> response.read()
'WSGI intercept successful!\\n'
.. _mechanize: http://wwwsearch.sf.net/
zope.testbrowser
----------------
zope.testbrowser_ is a prettified interface to mechanize_ that is used
primarily for testing Zope applications.
zope.testbrowser is also pretty easy ::
>>> import wsgi_intercept.zope_testbrowser
>>> from wsgi_intercept.test_wsgi_app import create_fn
>>> wsgi_intercept.add_wsgi_intercept('some_host', 80, create_fn)
>>> b = wsgi_intercept.zope_testbrowser.WSGI_Browser('http://some_host:80/')
>>> b.contents
'WSGI intercept successful!\\n'
.. _zope.testbrowser: http://www.python.org/pypi/zope.testbrowser
History
=======
Pursuant to Ian Bicking's `"best Web testing framework"`_ post,
Titus Brown put together an `in-process HTTP-to-WSGI interception mechanism`_ for
his own Web testing system, twill_. Because the mechanism is pretty
generic -- it works at the httplib level -- Titus decided to try adding it into
all of the *other* Python Web testing frameworks.
This is the result.
Mocking your HTTP Server
========================
Marc Hedlund has gone one further, and written a full-blown mock HTTP
server for wsgi_intercept. Combined with wsgi_intercept itself, this
lets you entirely replace client calls to a server with a mock setup
that hits neither the network nor server code. You can see his work
in the file ``mock_http.py``. Run ``mock_http.py`` to see a test.
.. _twill: http://www.idyll.org/~t/www-tools/twill.html
.. _"best Web testing framework": http://blog.ianbicking.org/best-of-the-web-app-test-frameworks.html
.. _in-process HTTP-to-WSGI interception mechanism: http://www.advogato.org/person/titus/diary.html?start=119
.. _WSGI application: http://www.python.org/peps/pep-0333.html
.. _funkload: http://funkload.nuxeo.org/
Project Home
============
If you aren't already there, this project lives on `Google Code`_. Please submit all bugs, patches, failing tests, et cetera using the `Issue Tracker`_
.. _Google Code: http://code.google.com/p/wsgi-intercept/
.. _Issue Tracker: http://code.google.com/p/wsgi-intercept/issues/list
"""
__version__ = '0.5.1'
import sys
from http.client import HTTPConnection
import urllib
from io import BytesIO
import traceback
debuglevel = 0
# 1 basic
# 2 verbose
####
#
# Specify which hosts/ports to target for interception to a given WSGI app.
#
# For simplicity's sake, intercept ENTIRE host/port combinations;
# intercepting only specific URL subtrees gets complicated, because we don't
# have that information in the HTTPConnection.connect() function that does the
# redirection.
#
# format: key=(host, port), value=(create_app, top_url)
#
# (top_url becomes the SCRIPT_NAME)
_wsgi_intercept = {}
def add_wsgi_intercept(host, port, app_create_fn, script_name=b''):
"""
Add a WSGI intercept call for host:port, using the app returned
by app_create_fn with a SCRIPT_NAME of 'script_name' (default '').
"""
_wsgi_intercept[(host, port)] = (app_create_fn, script_name)
def remove_wsgi_intercept(*args):
"""
Remove the WSGI intercept call for (host, port). If no arguments are given, removes all intercepts
"""
global _wsgi_intercept
if len(args)==0:
_wsgi_intercept = {}
else:
key = (args[0], args[1])
if _wsgi_intercept.has_key(key):
del _wsgi_intercept[key]
#
# make_environ: behave like a Web server. Take in 'input', and behave
# as if you're bound to 'host' and 'port'; build an environment dict
# for the WSGI app.
#
# This is where the magic happens, folks.
#
def make_environ(inp, host, port, script_name):
"""
Take 'inp' as if it were HTTP-speak being received on host:port,
and parse it into a WSGI-ok environment dictionary. Return the
dictionary.
Set 'SCRIPT_NAME' from the 'script_name' input, and, if present,
remove it from the beginning of the PATH_INFO variable.
"""
#
# parse the input up to the first blank line (or its end).
#
environ = {}
method_line = inp.readline()
content_type = None
content_length = None
cookies = []
for line in inp:
if not line.strip():
break
print('line', line)
k, v = line.strip().split(b':', 1)
v = v.lstrip()
#
# take care of special headers, and for the rest, put them
# into the environ with HTTP_ in front.
#
if k.lower() == b'content-type':
content_type = v
elif k.lower() == b'content-length':
content_length = v
elif k.lower() == b'cookie' or k.lower() == b'cookie2':
cookies.append(v)
else:
h = k.upper()
h = h.replace(b'-', b'_')
environ['HTTP_' + h.decode()] = v
if debuglevel >= 2:
print('HEADER:', k, v)
#
# decode the method line
#
if debuglevel >= 2:
print('METHOD LINE:', method_line)
method, url, protocol = method_line.split(b' ')
# clean the script_name off of the url, if it's there.
if not url.startswith(script_name):
script_name = '' # @CTB what to do -- bad URL. scrap?
else:
url = url[len(script_name):]
url = url.split(b'?', 1)
path_info = url[0]
query_string = ""
if len(url) == 2:
query_string = url[1]
if debuglevel:
print("method: %s; script_name: %s; path_info: %s; query_string: %s" %
(method, script_name, path_info, query_string))
r = inp.read()
inp = BytesIO(r)
#
# fill out our dictionary.
#
environ.update({ "wsgi.version" : (1,0),
"wsgi.url_scheme": "http",
"wsgi.input" : inp, # to read for POSTs
"wsgi.errors" : BytesIO(),
"wsgi.multithread" : 0,
"wsgi.multiprocess" : 0,
"wsgi.run_once" : 0,
"PATH_INFO" : path_info,
"QUERY_STRING" : query_string,
"REMOTE_ADDR" : '127.0.0.1',
"REQUEST_METHOD" : method,
"SCRIPT_NAME" : script_name,
"SERVER_NAME" : host,
"SERVER_PORT" : str(port),
"SERVER_PROTOCOL" : protocol,
})
#
# query_string, content_type & length are optional.
#
if query_string:
environ['QUERY_STRING'] = query_string
if content_type:
environ['CONTENT_TYPE'] = content_type
if debuglevel >= 2:
print('CONTENT-TYPE:', content_type)
if content_length:
environ['CONTENT_LENGTH'] = content_length
if debuglevel >= 2:
print('CONTENT-LENGTH:', content_length)
#
# handle cookies.
#
if cookies:
environ['HTTP_COOKIE'] = "; ".join(cookies)
if debuglevel:
print('WSGI environ dictionary:', environ)
return environ
#
# fake socket for WSGI intercept stuff.
#
class wsgi_fake_socket:
"""
Handle HTTP traffic and stuff into a WSGI application object instead.
Note that this class assumes:
1. 'makefile' is called (by the response class) only after all of the
data has been sent to the socket by the request class;
2. non-persistent (i.e. non-HTTP/1.1) connections.
"""
def __init__(self, app, host, port, script_name):
self.app = app # WSGI app object
self.host = host
self.port = port
self.script_name = script_name # SCRIPT_NAME (app mount point)
self.inp = BytesIO() # stuff written into this "socket"
self.write_results = [] # results from the 'write_fn'
self.results = None # results from running the app
self.output = BytesIO() # all output from the app, incl headers
def makefile(self, *args, **kwargs):
"""
'makefile' is called by the HTTPResponse class once all of the
data has been written. So, in this interceptor class, we need to:
1. build a start_response function that grabs all the headers
returned by the WSGI app;
2. create a wsgi.input file object 'inp', containing all of the
traffic;
3. build an environment dict out of the traffic in inp;
4. run the WSGI app & grab the result object;
5. concatenate & return the result(s) read from the result object.
@CTB: 'start_response' should return a function that writes
directly to self.result, too.
"""
# dynamically construct the start_response function for no good reason.
def start_response(status, headers, exc_info=None):
# construct the HTTP request.
self.output.write(b"HTTP/1.0 " + status.encode('utf-8') + b"\n")
for k, v in headers:
try:
k = k.encode('utf-8')
except AttributeError:
pass
try:
v = v.encode('utf-8')
except AttributeError:
pass
self.output.write(k + b':' + v + b"\n")
self.output.write(b'\n')
def write_fn(s):
self.write_results.append(s)
return write_fn
# construct the wsgi.input file from everything that's been
# written to this "socket".
inp = BytesIO(self.inp.getvalue())
# build the environ dictionary.
environ = make_environ(inp, self.host, self.port, self.script_name)
# run the application.
app_result = self.app(environ, start_response)
self.result = iter(app_result)
###
# read all of the results. the trick here is to get the *first*
# bit of data from the app via the generator, *then* grab & return
# the data passed back from the 'write' function, and then return
# the generator data. this is because the 'write' fn doesn't
# necessarily get called until the first result is requested from
# the app function.
#
# see twill tests, 'test_wrapper_intercept' for a test that breaks
# if this is done incorrectly.
try:
generator_data = None
try:
generator_data = next(self.result)
finally:
for data in self.write_results:
self.output.write(data)
if generator_data:
self.output.write(generator_data)
while 1:
data = next(self.result)
self.output.write(data)
except StopIteration:
pass
if hasattr(app_result, 'close'):
app_result.close()
if debuglevel >= 2:
print( "***", self.output.getvalue(), "***")
# return the concatenated results.
return BytesIO(self.output.getvalue())
def sendall(self, str):
"""
Save all the traffic to self.inp.
"""
if debuglevel >= 2:
print(">>>", str, ">>>")
try:
self.inp.write(str)
except TypeError:
self.inp.write(bytes([str]).decode('utf-8'))
def close(self):
"Do nothing, for now."
pass
#
# WSGI_HTTPConnection
#
class WSGI_HTTPConnection(HTTPConnection):
"""
Intercept all traffic to certain hosts & redirect into a WSGI
application object.
"""
def get_app(self, host, port):
"""
Return the app object for the given (host, port).
"""
key = (host, int(port))
app, script_name = None, None
if key in _wsgi_intercept:
(app_fn, script_name) = _wsgi_intercept[key]
app = app_fn()
return app, script_name
def connect(self):
"""
Override the connect() function to intercept calls to certain
host/ports.
If no app at host/port has been registered for interception then
a normal HTTPConnection is made.
"""
if debuglevel:
sys.stderr.write('connect: %s, %s\n' % (self.host, self.port,))
try:
(app, script_name) = self.get_app(self.host, self.port)
if app:
if debuglevel:
sys.stderr.write('INTERCEPTING call to %s:%s\n' % \
(self.host, self.port,))
self.sock = wsgi_fake_socket(app, self.host, self.port,
script_name)
else:
HTTPConnection.connect(self)
except Exception as e:
if debuglevel: # intercept & print out tracebacks
traceback.print_exc()
raise
#
# WSGI_HTTPSConnection
#
try:
from http.client import HTTPSConnection
except ImportError:
pass
else:
class WSGI_HTTPSConnection(HTTPSConnection, WSGI_HTTPConnection):
"""
Intercept all traffic to certain hosts & redirect into a WSGI
application object.
"""
def get_app(self, host, port):
"""
Return the app object for the given (host, port).
"""
key = (host, int(port))
app, script_name = None, None
if _wsgi_intercept.has_key(key):
(app_fn, script_name) = _wsgi_intercept[key]
app = app_fn()
return app, script_name
def connect(self):
"""
Override the connect() function to intercept calls to certain
host/ports.
If no app at host/port has been registered for interception then
a normal HTTPSConnection is made.
"""
if debuglevel:
sys.stderr.write('connect: %s, %s\n' % (self.host, self.port,))
try:
(app, script_name) = self.get_app(self.host, self.port)
if app:
if debuglevel:
sys.stderr.write('INTERCEPTING call to %s:%s\n' % \
(self.host, self.port,))
self.sock = wsgi_fake_socket(app, self.host, self.port,
script_name)
else:
HTTPSConnection.connect(self)
except Exception as e:
if debuglevel: # intercept & print out tracebacks
traceback.print_exc()
raise

View File

@@ -0,0 +1,50 @@
"""intercept HTTP connections that use httplib2
(see wsgi_intercept/__init__.py for examples)
"""
import httplib2
import wsgi_intercept
from httplib2 import SCHEME_TO_CONNECTION, HTTPConnectionWithTimeout, HTTPSConnectionWithTimeout
import sys
InterceptorMixin = wsgi_intercept.WSGI_HTTPConnection
# might make more sense as a decorator
def connect(self):
"""
Override the connect() function to intercept calls to certain
host/ports.
"""
if wsgi_intercept.debuglevel:
sys.stderr.write('connect: %s, %s\n' % (self.host, self.port,))
(app, script_name) = self.get_app(self.host, self.port)
if app:
if wsgi_intercept.debuglevel:
sys.stderr.write('INTERCEPTING call to %s:%s\n' % \
(self.host, self.port,))
self.sock = wsgi_intercept.wsgi_fake_socket(app,
self.host, self.port,
script_name)
else:
self._connect()
class HTTP_WSGIInterceptorWithTimeout(HTTPConnectionWithTimeout, InterceptorMixin):
_connect = httplib2.HTTPConnectionWithTimeout.connect
connect = connect
class HTTPS_WSGIInterceptorWithTimeout(HTTPSConnectionWithTimeout, InterceptorMixin):
_connect = httplib2.HTTPSConnectionWithTimeout.connect
connect = connect
def install():
SCHEME_TO_CONNECTION['http'] = HTTP_WSGIInterceptorWithTimeout
SCHEME_TO_CONNECTION['https'] = HTTPS_WSGIInterceptorWithTimeout
def uninstall():
SCHEME_TO_CONNECTION['http'] = HTTPConnectionWithTimeout
SCHEME_TO_CONNECTION['https'] = HTTPSConnectionWithTimeout

View File

@@ -0,0 +1,21 @@
"""intercept HTTP connections that use httplib
(see wsgi_intercept/__init__.py for examples)
"""
import httplib
import wsgi_intercept
import sys
from httplib import (
HTTPConnection as OriginalHTTPConnection,
HTTPSConnection as OriginalHTTPSConnection)
def install():
httplib.HTTPConnection = wsgi_intercept.WSGI_HTTPConnection
httplib.HTTPSConnection = wsgi_intercept.WSGI_HTTPSConnection
def uninstall():
httplib.HTTPConnection = OriginalHTTPConnection
httplib.HTTPSConnection = OriginalHTTPSConnection

View File

@@ -0,0 +1,7 @@
"""intercept http requests made using the urllib2 module.
(see wsgi_intercept/__init__.py for examples)
"""
from wsgi_urllib2 import *

View File

@@ -0,0 +1,62 @@
import sys
from wsgi_intercept import WSGI_HTTPConnection
import urllib2, httplib
from urllib2 import HTTPHandler
from httplib import HTTP
#
# ugh, version dependence.
#
if sys.version_info[:2] == (2, 3):
class WSGI_HTTP(HTTP):
_connection_class = WSGI_HTTPConnection
class WSGI_HTTPHandler(HTTPHandler):
"""
Override the default HTTPHandler class with one that uses the
WSGI_HTTPConnection class to open HTTP URLs.
"""
def http_open(self, req):
return self.do_open(WSGI_HTTP, req)
# I'm not implementing HTTPS for 2.3 until someone complains about it! -Kumar
WSGI_HTTPSHandler = None
else:
class WSGI_HTTPHandler(HTTPHandler):
"""
Override the default HTTPHandler class with one that uses the
WSGI_HTTPConnection class to open HTTP URLs.
"""
def http_open(self, req):
return self.do_open(WSGI_HTTPConnection, req)
if hasattr(httplib, 'HTTPS'):
# urllib2 does this check as well, I assume it's to see if
# python was compiled with SSL support
from wsgi_intercept import WSGI_HTTPSConnection
from urllib2 import HTTPSHandler
class WSGI_HTTPSHandler(HTTPSHandler):
"""
Override the default HTTPSHandler class with one that uses the
WSGI_HTTPConnection class to open HTTPS URLs.
"""
def https_open(self, req):
return self.do_open(WSGI_HTTPSConnection, req)
else:
WSGI_HTTPSHandler = None
def install_opener():
handlers = [WSGI_HTTPHandler()]
if WSGI_HTTPSHandler is not None:
handlers.append(WSGI_HTTPSHandler())
opener = urllib2.build_opener(*handlers)
urllib2.install_opener(opener)
return opener
def uninstall_opener():
urllib2.install_opener(None)