From 12d4ef8288007c3fbcfae503c614610a9ff002e7 Mon Sep 17 00:00:00 2001 From: Chris Dent Date: Tue, 2 Oct 2012 15:55:06 +0100 Subject: [PATCH] Barebones of initial starting point None of this works, this just imports the old code, leaving behind those parts not deemed immediately relevant. --- Makefile | 9 + docs/README.txt | 10 + docs/index.rst | 8 + setup.py | 49 ++ test/__init__.py | 2 + test/test_httplib.py | 48 ++ test/test_httplib2.py | 38 ++ test/test_wsgi_compliance.py | 49 ++ test/test_wsgi_urllib2.py | 45 ++ test/wsgi_app.py | 24 + wsgi_intercept/__init__.py | 617 ++++++++++++++++++ wsgi_intercept/httplib2_intercept.py | 50 ++ wsgi_intercept/httplib_intercept.py | 21 + wsgi_intercept/urllib2_intercept/__init__.py | 7 + .../urllib2_intercept/wsgi_urllib2.py | 62 ++ 15 files changed, 1039 insertions(+) create mode 100644 Makefile create mode 100644 docs/README.txt create mode 100644 docs/index.rst create mode 100644 setup.py create mode 100644 test/__init__.py create mode 100644 test/test_httplib.py create mode 100644 test/test_httplib2.py create mode 100644 test/test_wsgi_compliance.py create mode 100644 test/test_wsgi_urllib2.py create mode 100644 test/wsgi_app.py create mode 100644 wsgi_intercept/__init__.py create mode 100644 wsgi_intercept/httplib2_intercept.py create mode 100644 wsgi_intercept/httplib_intercept.py create mode 100644 wsgi_intercept/urllib2_intercept/__init__.py create mode 100644 wsgi_intercept/urllib2_intercept/wsgi_urllib2.py diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..b6fb96b --- /dev/null +++ b/Makefile @@ -0,0 +1,9 @@ + +clean: + find wsgi_intercept -name "*.pyc" |xargs rm || true + rm -r dist || true + rm -r build || true + rm -r wsgi_intercept.egg-info || true + rm *.bundle || true + rm -r *-bundle* || true + diff --git a/docs/README.txt b/docs/README.txt new file mode 100644 index 0000000..f2a841a --- /dev/null +++ b/docs/README.txt @@ -0,0 +1,10 @@ + +Build docs as HTML with:: + + python setup.py build_docs + +To publish docs to stdout in Google Code wiki format:: + + python setup.py publish_docs --google-user=x --google-password=x + +Just use literally "x" for user / pass since logging in and publishing is not implemented. Yea! \ No newline at end of file diff --git a/docs/index.rst b/docs/index.rst new file mode 100644 index 0000000..1118a1a --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,8 @@ +=============================================================================== +wsgi_intercept: installs a WSGI application in place of a real URI for testing. +=============================================================================== + +.. contents:: + +.. include_docstring:: ./wsgi_intercept/__init__.py + diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..563089f --- /dev/null +++ b/setup.py @@ -0,0 +1,49 @@ + +from setuptools import setup, find_packages +import compiler, pydoc +from compiler import visitor + +class ModuleVisitor(object): + def __init__(self): + self.mod_doc = None + self.mod_version = None + def default(self, node): + for child in node.getChildNodes(): + self.visit(child) + def visitModule(self, node): + self.mod_doc = node.doc + self.default(node) + def visitAssign(self, node): + if self.mod_version: + return + asn = node.nodes[0] + assert asn.name == '__version__', ( + "expected __version__ node: %s" % asn) + self.mod_version = node.expr.value + self.default(node) + +def get_module_meta(modfile): + ast = compiler.parseFile(modfile) + modnode = ModuleVisitor() + visitor.walk(ast, modnode) + if modnode.mod_doc is None: + raise RuntimeError( + "could not parse doc string from %s" % modfile) + if modnode.mod_version is None: + raise RuntimeError( + "could not parse __version__ from %s" % modfile) + return (modnode.mod_version,) + pydoc.splitdoc(modnode.mod_doc) + +version, description, long_description = get_module_meta("./wsgi_intercept/__init__.py") + +setup( + name = 'wsgi_intercept', + version = version, + author = 'Titus Brown, Kumar McMillan, Chris Dent', + author_email = 'cdent@peermore.com', + description = description, + url="http://pypi.python.org/pypi/wsgi_intercept", + long_description = long_description, + license = 'MIT License', + packages = find_packages(), + ) diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..0922ce5 --- /dev/null +++ b/test/__init__.py @@ -0,0 +1,2 @@ +import warnings +warnings.simplefilter('error') diff --git a/test/test_httplib.py b/test/test_httplib.py new file mode 100644 index 0000000..1cd3af7 --- /dev/null +++ b/test/test_httplib.py @@ -0,0 +1,48 @@ +#! /usr/bin/env python2.4 +from nose.tools import with_setup, raises, eq_ +from wsgi_intercept import httplib_intercept +from socket import gaierror +import wsgi_intercept +from wsgi_intercept import test_wsgi_app +import httplib + +_saved_debuglevel = None + + +def http_install(): + _saved_debuglevel, wsgi_intercept.debuglevel = wsgi_intercept.debuglevel, 1 + httplib_intercept.install() + wsgi_intercept.add_wsgi_intercept('some_hopefully_nonexistant_domain', 80, test_wsgi_app.create_fn) + +def http_uninstall(): + wsgi_intercept.debuglevel = _saved_debuglevel + wsgi_intercept.remove_wsgi_intercept('some_hopefully_nonexistant_domain', 80) + httplib_intercept.uninstall() + +@with_setup(http_install, http_uninstall) +def test_http_success(): + http = httplib.HTTPConnection('some_hopefully_nonexistant_domain') + http.request('GET', '/') + content = http.getresponse().read() + eq_(content, 'WSGI intercept successful!\n') + assert test_wsgi_app.success() + + + +def https_install(): + _saved_debuglevel, wsgi_intercept.debuglevel = wsgi_intercept.debuglevel, 1 + httplib_intercept.install() + wsgi_intercept.add_wsgi_intercept('some_hopefully_nonexistant_domain', 443, test_wsgi_app.create_fn) + +def https_uninstall(): + wsgi_intercept.debuglevel = _saved_debuglevel + wsgi_intercept.remove_wsgi_intercept('some_hopefully_nonexistant_domain', 443) + httplib_intercept.uninstall() + +@with_setup(https_install, https_uninstall) +def test_https_success(): + http = httplib.HTTPSConnection('some_hopefully_nonexistant_domain') + http.request('GET', '/') + content = http.getresponse().read() + eq_(content, 'WSGI intercept successful!\n') + assert test_wsgi_app.success() \ No newline at end of file diff --git a/test/test_httplib2.py b/test/test_httplib2.py new file mode 100644 index 0000000..33f3e0c --- /dev/null +++ b/test/test_httplib2.py @@ -0,0 +1,38 @@ +#! /usr/bin/env python2.4 +from wsgi_intercept import httplib2_intercept +from nose.tools import with_setup, raises, eq_ +from socket import gaierror +import wsgi_intercept +from wsgi_intercept import test_wsgi_app +import httplib2 + +_saved_debuglevel = None + + +def install(port=80): + _saved_debuglevel, wsgi_intercept.debuglevel = wsgi_intercept.debuglevel, 1 + httplib2_intercept.install() + wsgi_intercept.add_wsgi_intercept('some_hopefully_nonexistant_domain', port, test_wsgi_app.create_fn) + +def uninstall(): + wsgi_intercept.debuglevel = _saved_debuglevel + httplib2_intercept.uninstall() + +@with_setup(install, uninstall) +def test_success(): + http = httplib2.Http() + resp, content = http.request('http://some_hopefully_nonexistant_domain:80/', 'GET') + eq_(content, "WSGI intercept successful!\n") + assert test_wsgi_app.success() + +@with_setup(install, uninstall) +@raises(gaierror) +def test_bogus_domain(): + wsgi_intercept.debuglevel = 1; + httplib2_intercept.HTTP_WSGIInterceptorWithTimeout("_nonexistant_domain_").connect() + +@with_setup(lambda: install(443), uninstall) +def test_https_success(): + http = httplib2.Http() + resp, content = http.request('https://some_hopefully_nonexistant_domain/', 'GET') + assert test_wsgi_app.success() \ No newline at end of file diff --git a/test/test_wsgi_compliance.py b/test/test_wsgi_compliance.py new file mode 100644 index 0000000..d9a70b5 --- /dev/null +++ b/test/test_wsgi_compliance.py @@ -0,0 +1,49 @@ +#! /usr/bin/env python2.4 +from wsgi_intercept.httplib2_intercept import install, uninstall +import wsgi_intercept +import test.wsgi_app +import httplib2 + +_saved_debuglevel = None + +def setup(): + warnings.simplefilter("error") + _saved_debuglevel, wsgi_intercept.debuglevel = wsgi_intercept.debuglevel, 1 + install() + wsgi_intercept.add_wsgi_intercept('some_hopefully_nonexistant_domain', 80, prudent_wsgi_app) + +def test(): + http = httplib2.Http() + resp, content = http.request('http://some_hopefully_nonexistant_domain:80/', 'GET') + assert test_wsgi_app.success() + +def test_quoting_issue11(): + # see http://code.google.com/p/wsgi-intercept/issues/detail?id=11 + http = httplib2.Http() + inspected_env = {} + def make_path_checking_app(): + def path_checking_app(environ, start_response): + inspected_env ['QUERY_STRING'] = environ['QUERY_STRING'] + inspected_env ['PATH_INFO'] = environ['PATH_INFO'] + status = '200 OK' + response_headers = [('Content-type','text/plain')] + start_response(status, response_headers) + return [] + return path_checking_app + wsgi_intercept.add_wsgi_intercept('some_hopefully_nonexistant_domain', 80, make_path_checking_app) + resp, content = http.request('http://some_hopefully_nonexistant_domain:80/spaced+words.html?word=something%20spaced', 'GET') + assert ('QUERY_STRING' in inspected_env and 'PATH_INFO' in inspected_env), "path_checking_app() was never called?" + eq_(inspected_env['PATH_INFO'], '/spaced+words.html') + eq_(inspected_env['QUERY_STRING'], 'word=something%20spaced') + +def teardown(): + warnings.resetwarnings() + wsgi_intercept.debuglevel = _saved_debuglevel + uninstall() + +if __name__ == '__main__': + setup() + try: + test() + finally: + teardown() diff --git a/test/test_wsgi_urllib2.py b/test/test_wsgi_urllib2.py new file mode 100644 index 0000000..f2650f7 --- /dev/null +++ b/test/test_wsgi_urllib2.py @@ -0,0 +1,45 @@ +#! /usr/bin/env python +import sys, os.path +from nose.tools import with_setup +import urllib2 +from wsgi_intercept import urllib2_intercept +import wsgi_intercept +from wsgi_intercept import test_wsgi_app + +_saved_debuglevel = None + +def add_http_intercept(): + _saved_debuglevel, wsgi_intercept.debuglevel = wsgi_intercept.debuglevel, 1 + wsgi_intercept.add_wsgi_intercept('some_hopefully_nonexistant_domain', 80, test_wsgi_app.create_fn) + +def add_https_intercept(): + _saved_debuglevel, wsgi_intercept.debuglevel = wsgi_intercept.debuglevel, 1 + wsgi_intercept.add_wsgi_intercept('some_hopefully_nonexistant_domain', 443, test_wsgi_app.create_fn) + +def remove_intercept(): + wsgi_intercept.debuglevel = _saved_debuglevel + wsgi_intercept.remove_wsgi_intercept() + +@with_setup(add_http_intercept, remove_intercept) +def test_http(): + urllib2_intercept.install_opener() + urllib2.urlopen('http://some_hopefully_nonexistant_domain:80/') + assert test_wsgi_app.success() + +@with_setup(add_http_intercept, remove_intercept) +def test_http_default_port(): + urllib2_intercept.install_opener() + urllib2.urlopen('http://some_hopefully_nonexistant_domain/') + assert test_wsgi_app.success() + +@with_setup(add_https_intercept, remove_intercept) +def test_https(): + urllib2_intercept.install_opener() + urllib2.urlopen('https://some_hopefully_nonexistant_domain:443/') + assert test_wsgi_app.success() + +@with_setup(add_https_intercept, remove_intercept) +def test_https_default_port(): + urllib2_intercept.install_opener() + urllib2.urlopen('https://some_hopefully_nonexistant_domain/') + assert test_wsgi_app.success() \ No newline at end of file diff --git a/test/wsgi_app.py b/test/wsgi_app.py new file mode 100644 index 0000000..33a3de6 --- /dev/null +++ b/test/wsgi_app.py @@ -0,0 +1,24 @@ +""" +A simple WSGI application for testing. +""" + +_app_was_hit = False + +def success(): + return _app_was_hit + +def simple_app(environ, start_response): + """Simplest possible application object""" + status = '200 OK' + response_headers = [('Content-type','text/plain')] + start_response(status, response_headers) + + global _app_was_hit + _app_was_hit = True + + return ['WSGI intercept successful!\n'] + +def create_fn(): + global _app_was_hit + _app_was_hit = False + return simple_app diff --git a/wsgi_intercept/__init__.py b/wsgi_intercept/__init__.py new file mode 100644 index 0000000..d742680 --- /dev/null +++ b/wsgi_intercept/__init__.py @@ -0,0 +1,617 @@ + +"""installs a WSGI application in place of a real URI for testing. + +Introduction +============ + +Testing a WSGI application normally involves starting a server at a local host and port, then pointing your test code to that address. Instead, this library lets you intercept calls to any specific host/port combination and redirect them into a `WSGI application`_ importable by your test program. Thus, you can avoid spawning multiple processes or threads to test your Web app. + +How Does It Work? +================= + +``wsgi_intercept`` works by replacing ``httplib.HTTPConnection`` with a subclass, ``wsgi_intercept.WSGI_HTTPConnection``. This class then redirects specific server/port combinations into a WSGI application by emulating a socket. If no intercept is registered for the host and port requested, those requests are passed on to the standard handler. + +The functions ``add_wsgi_intercept(host, port, app_create_fn, script_name='')`` and ``remove_wsgi_intercept(host,port)`` specify which URLs should be redirect into what applications. Note especially that ``app_create_fn`` is a *function object* returning a WSGI application; ``script_name`` becomes ``SCRIPT_NAME`` in the WSGI app's environment, if set. + +Install +======= + +:: + + easy_install wsgi_intercept + +(The ``easy_install`` command is bundled with the setuptools_ module) + +To use a `development version`_ of wsgi_intercept, run:: + + easy_install http://wsgi-intercept.googlecode.com/svn/trunk + +.. _setuptools: http://cheeseshop.python.org/pypi/setuptools/ +.. _development version: http://wsgi-intercept.googlecode.com/svn/trunk/#egg=wsgi_intercept-dev + +Packages Intercepted +==================== + +Unfortunately each of the Web testing frameworks uses its own specific mechanism for making HTTP call-outs, so individual implementations are needed. Below are the packages supported and how to create an intercept. + +urllib2 +------- + +urllib2_ is a standard Python module, and ``urllib2.urlopen`` is a pretty +normal way to open URLs. + +The following code will install the WSGI intercept stuff as a default +urllib2 handler: :: + + >>> from wsgi_intercept.urllib2_intercept import install_opener + >>> install_opener() #doctest: +ELLIPSIS + + >>> import wsgi_intercept + >>> from wsgi_intercept.test_wsgi_app import create_fn + >>> wsgi_intercept.add_wsgi_intercept('some_host', 80, create_fn) + >>> import urllib2 + >>> urllib2.urlopen('http://some_host:80/').read() + 'WSGI intercept successful!\\n' + +The only tricky bit in there is that different handler classes need to +be constructed for Python 2.3 and Python 2.4, because the httplib +interface changed between those versions. + +.. _urllib2: http://docs.python.org/lib/module-urllib2.html + +httplib2 +-------- + +httplib2_ is a 3rd party extension of the built-in ``httplib``. To intercept +requests, it is similar to urllib2:: + + >>> from wsgi_intercept.httplib2_intercept import install + >>> install() + >>> import wsgi_intercept + >>> from wsgi_intercept.test_wsgi_app import create_fn + >>> wsgi_intercept.add_wsgi_intercept('some_host', 80, create_fn) + >>> import httplib2 + >>> resp, content = httplib2.Http().request('http://some_host:80/', 'GET') + >>> content + 'WSGI intercept successful!\\n' + +(Contributed by `David "Whit" Morris`_.) + +.. _httplib2: http://code.google.com/p/httplib2/ +.. _David "Whit" Morris: http://public.xdi.org/=whit + +webtest +------- + +webtest_ is an extension to ``unittest`` that has some nice functions for +testing Web sites. + +To install the WSGI intercept handler, do :: + + >>> import wsgi_intercept.webtest_intercept + >>> class WSGI_Test(wsgi_intercept.webtest_intercept.WebCase): + ... HTTP_CONN = wsgi_intercept.WSGI_HTTPConnection + ... HOST='localhost' + ... PORT=80 + ... + ... def setUp(self): + ... wsgi_intercept.add_wsgi_intercept(self.HOST, self.PORT, create_fn) + ... + >>> + +.. _webtest: http://www.cherrypy.org/file/trunk/cherrypy/test/webtest.py + +webunit +------- + +webunit_ is another unittest-like framework that contains nice functions +for Web testing. (funkload_ uses webunit, too.) + +webunit needed to be patched to support different scheme handlers. +The patched package is in webunit/wsgi_webunit/, and the only +file that was changed was webunittest.py; the original is in +webunittest-orig.py. + +To install the WSGI intercept handler, do :: + + >>> from httplib import HTTP + >>> import wsgi_intercept.webunit_intercept + >>> class WSGI_HTTP(HTTP): + ... _connection_class = wsgi_intercept.WSGI_HTTPConnection + ... + >>> class WSGI_WebTestCase(wsgi_intercept.webunit_intercept.WebTestCase): + ... scheme_handlers = dict(http=WSGI_HTTP) + ... + ... def setUp(self): + ... wsgi_intercept.add_wsgi_intercept('127.0.0.1', 80, create_fn) + ... + >>> + +.. _webunit: http://mechanicalcat.net/tech/webunit/ + +mechanize +--------- + +mechanize_ is John J. Lee's port of Perl's WWW::Mechanize to Python. +It mimics a browser. (It's also what's behind twill_.) + + >>> import wsgi_intercept.mechanize_intercept + >>> from wsgi_intercept.test_wsgi_app import create_fn + >>> wsgi_intercept.add_wsgi_intercept('some_host', 80, create_fn) + >>> b = wsgi_intercept.mechanize_intercept.Browser() + >>> response = b.open('http://some_host:80') + >>> response.read() + 'WSGI intercept successful!\\n' + +.. _mechanize: http://wwwsearch.sf.net/ + +zope.testbrowser +---------------- + +zope.testbrowser_ is a prettified interface to mechanize_ that is used +primarily for testing Zope applications. + +zope.testbrowser is also pretty easy :: + + >>> import wsgi_intercept.zope_testbrowser + >>> from wsgi_intercept.test_wsgi_app import create_fn + >>> wsgi_intercept.add_wsgi_intercept('some_host', 80, create_fn) + >>> b = wsgi_intercept.zope_testbrowser.WSGI_Browser('http://some_host:80/') + >>> b.contents + 'WSGI intercept successful!\\n' + +.. _zope.testbrowser: http://www.python.org/pypi/zope.testbrowser + +History +======= + +Pursuant to Ian Bicking's `"best Web testing framework"`_ post, +Titus Brown put together an `in-process HTTP-to-WSGI interception mechanism`_ for +his own Web testing system, twill_. Because the mechanism is pretty +generic -- it works at the httplib level -- Titus decided to try adding it into +all of the *other* Python Web testing frameworks. + +This is the result. + +Mocking your HTTP Server +======================== + +Marc Hedlund has gone one further, and written a full-blown mock HTTP +server for wsgi_intercept. Combined with wsgi_intercept itself, this +lets you entirely replace client calls to a server with a mock setup +that hits neither the network nor server code. You can see his work +in the file ``mock_http.py``. Run ``mock_http.py`` to see a test. + + +.. _twill: http://www.idyll.org/~t/www-tools/twill.html +.. _"best Web testing framework": http://blog.ianbicking.org/best-of-the-web-app-test-frameworks.html +.. _in-process HTTP-to-WSGI interception mechanism: http://www.advogato.org/person/titus/diary.html?start=119 +.. _WSGI application: http://www.python.org/peps/pep-0333.html +.. _funkload: http://funkload.nuxeo.org/ + +Project Home +============ + +If you aren't already there, this project lives on `Google Code`_. Please submit all bugs, patches, failing tests, et cetera using the `Issue Tracker`_ + +.. _Google Code: http://code.google.com/p/wsgi-intercept/ +.. _Issue Tracker: http://code.google.com/p/wsgi-intercept/issues/list + +""" +__version__ = '0.5.1' + +import sys +from http.client import HTTPConnection +import urllib +from io import BytesIO +import traceback + +debuglevel = 0 +# 1 basic +# 2 verbose + +#### + +# +# Specify which hosts/ports to target for interception to a given WSGI app. +# +# For simplicity's sake, intercept ENTIRE host/port combinations; +# intercepting only specific URL subtrees gets complicated, because we don't +# have that information in the HTTPConnection.connect() function that does the +# redirection. +# +# format: key=(host, port), value=(create_app, top_url) +# +# (top_url becomes the SCRIPT_NAME) + +_wsgi_intercept = {} + +def add_wsgi_intercept(host, port, app_create_fn, script_name=b''): + """ + Add a WSGI intercept call for host:port, using the app returned + by app_create_fn with a SCRIPT_NAME of 'script_name' (default ''). + """ + _wsgi_intercept[(host, port)] = (app_create_fn, script_name) + +def remove_wsgi_intercept(*args): + """ + Remove the WSGI intercept call for (host, port). If no arguments are given, removes all intercepts + """ + global _wsgi_intercept + if len(args)==0: + _wsgi_intercept = {} + else: + key = (args[0], args[1]) + if _wsgi_intercept.has_key(key): + del _wsgi_intercept[key] + +# +# make_environ: behave like a Web server. Take in 'input', and behave +# as if you're bound to 'host' and 'port'; build an environment dict +# for the WSGI app. +# +# This is where the magic happens, folks. +# + +def make_environ(inp, host, port, script_name): + """ + Take 'inp' as if it were HTTP-speak being received on host:port, + and parse it into a WSGI-ok environment dictionary. Return the + dictionary. + + Set 'SCRIPT_NAME' from the 'script_name' input, and, if present, + remove it from the beginning of the PATH_INFO variable. + """ + # + # parse the input up to the first blank line (or its end). + # + + environ = {} + + method_line = inp.readline() + + content_type = None + content_length = None + cookies = [] + + for line in inp: + if not line.strip(): + break + + print('line', line) + k, v = line.strip().split(b':', 1) + v = v.lstrip() + + # + # take care of special headers, and for the rest, put them + # into the environ with HTTP_ in front. + # + + if k.lower() == b'content-type': + content_type = v + elif k.lower() == b'content-length': + content_length = v + elif k.lower() == b'cookie' or k.lower() == b'cookie2': + cookies.append(v) + else: + h = k.upper() + h = h.replace(b'-', b'_') + environ['HTTP_' + h.decode()] = v + + if debuglevel >= 2: + print('HEADER:', k, v) + + # + # decode the method line + # + + if debuglevel >= 2: + print('METHOD LINE:', method_line) + + method, url, protocol = method_line.split(b' ') + + # clean the script_name off of the url, if it's there. + if not url.startswith(script_name): + script_name = '' # @CTB what to do -- bad URL. scrap? + else: + url = url[len(script_name):] + + url = url.split(b'?', 1) + path_info = url[0] + query_string = "" + if len(url) == 2: + query_string = url[1] + + if debuglevel: + print("method: %s; script_name: %s; path_info: %s; query_string: %s" % + (method, script_name, path_info, query_string)) + + r = inp.read() + inp = BytesIO(r) + + # + # fill out our dictionary. + # + + environ.update({ "wsgi.version" : (1,0), + "wsgi.url_scheme": "http", + "wsgi.input" : inp, # to read for POSTs + "wsgi.errors" : BytesIO(), + "wsgi.multithread" : 0, + "wsgi.multiprocess" : 0, + "wsgi.run_once" : 0, + + "PATH_INFO" : path_info, + "QUERY_STRING" : query_string, + "REMOTE_ADDR" : '127.0.0.1', + "REQUEST_METHOD" : method, + "SCRIPT_NAME" : script_name, + "SERVER_NAME" : host, + "SERVER_PORT" : str(port), + "SERVER_PROTOCOL" : protocol, + }) + + # + # query_string, content_type & length are optional. + # + + if query_string: + environ['QUERY_STRING'] = query_string + + if content_type: + environ['CONTENT_TYPE'] = content_type + if debuglevel >= 2: + print('CONTENT-TYPE:', content_type) + if content_length: + environ['CONTENT_LENGTH'] = content_length + if debuglevel >= 2: + print('CONTENT-LENGTH:', content_length) + + # + # handle cookies. + # + if cookies: + environ['HTTP_COOKIE'] = "; ".join(cookies) + + if debuglevel: + print('WSGI environ dictionary:', environ) + + return environ + +# +# fake socket for WSGI intercept stuff. +# + +class wsgi_fake_socket: + """ + Handle HTTP traffic and stuff into a WSGI application object instead. + + Note that this class assumes: + + 1. 'makefile' is called (by the response class) only after all of the + data has been sent to the socket by the request class; + 2. non-persistent (i.e. non-HTTP/1.1) connections. + """ + def __init__(self, app, host, port, script_name): + self.app = app # WSGI app object + self.host = host + self.port = port + self.script_name = script_name # SCRIPT_NAME (app mount point) + + self.inp = BytesIO() # stuff written into this "socket" + self.write_results = [] # results from the 'write_fn' + self.results = None # results from running the app + self.output = BytesIO() # all output from the app, incl headers + + def makefile(self, *args, **kwargs): + """ + 'makefile' is called by the HTTPResponse class once all of the + data has been written. So, in this interceptor class, we need to: + + 1. build a start_response function that grabs all the headers + returned by the WSGI app; + 2. create a wsgi.input file object 'inp', containing all of the + traffic; + 3. build an environment dict out of the traffic in inp; + 4. run the WSGI app & grab the result object; + 5. concatenate & return the result(s) read from the result object. + + @CTB: 'start_response' should return a function that writes + directly to self.result, too. + """ + + # dynamically construct the start_response function for no good reason. + + def start_response(status, headers, exc_info=None): + # construct the HTTP request. + self.output.write(b"HTTP/1.0 " + status.encode('utf-8') + b"\n") + + for k, v in headers: + try: + k = k.encode('utf-8') + except AttributeError: + pass + try: + v = v.encode('utf-8') + except AttributeError: + pass + self.output.write(k + b':' + v + b"\n") + self.output.write(b'\n') + + def write_fn(s): + self.write_results.append(s) + return write_fn + + # construct the wsgi.input file from everything that's been + # written to this "socket". + inp = BytesIO(self.inp.getvalue()) + + # build the environ dictionary. + environ = make_environ(inp, self.host, self.port, self.script_name) + + # run the application. + app_result = self.app(environ, start_response) + self.result = iter(app_result) + + ### + + # read all of the results. the trick here is to get the *first* + # bit of data from the app via the generator, *then* grab & return + # the data passed back from the 'write' function, and then return + # the generator data. this is because the 'write' fn doesn't + # necessarily get called until the first result is requested from + # the app function. + # + # see twill tests, 'test_wrapper_intercept' for a test that breaks + # if this is done incorrectly. + + try: + generator_data = None + try: + generator_data = next(self.result) + + finally: + for data in self.write_results: + self.output.write(data) + + if generator_data: + self.output.write(generator_data) + + while 1: + data = next(self.result) + self.output.write(data) + + except StopIteration: + pass + + if hasattr(app_result, 'close'): + app_result.close() + + if debuglevel >= 2: + print( "***", self.output.getvalue(), "***") + + # return the concatenated results. + return BytesIO(self.output.getvalue()) + + def sendall(self, str): + """ + Save all the traffic to self.inp. + """ + if debuglevel >= 2: + print(">>>", str, ">>>") + + try: + self.inp.write(str) + except TypeError: + self.inp.write(bytes([str]).decode('utf-8')) + + + def close(self): + "Do nothing, for now." + pass + +# +# WSGI_HTTPConnection +# + +class WSGI_HTTPConnection(HTTPConnection): + """ + Intercept all traffic to certain hosts & redirect into a WSGI + application object. + """ + def get_app(self, host, port): + """ + Return the app object for the given (host, port). + """ + key = (host, int(port)) + + app, script_name = None, None + + if key in _wsgi_intercept: + (app_fn, script_name) = _wsgi_intercept[key] + app = app_fn() + + return app, script_name + + def connect(self): + """ + Override the connect() function to intercept calls to certain + host/ports. + + If no app at host/port has been registered for interception then + a normal HTTPConnection is made. + """ + if debuglevel: + sys.stderr.write('connect: %s, %s\n' % (self.host, self.port,)) + + try: + (app, script_name) = self.get_app(self.host, self.port) + if app: + if debuglevel: + sys.stderr.write('INTERCEPTING call to %s:%s\n' % \ + (self.host, self.port,)) + self.sock = wsgi_fake_socket(app, self.host, self.port, + script_name) + else: + HTTPConnection.connect(self) + + except Exception as e: + if debuglevel: # intercept & print out tracebacks + traceback.print_exc() + raise + +# +# WSGI_HTTPSConnection +# + +try: + from http.client import HTTPSConnection +except ImportError: + pass +else: + class WSGI_HTTPSConnection(HTTPSConnection, WSGI_HTTPConnection): + """ + Intercept all traffic to certain hosts & redirect into a WSGI + application object. + """ + def get_app(self, host, port): + """ + Return the app object for the given (host, port). + """ + key = (host, int(port)) + + app, script_name = None, None + + if _wsgi_intercept.has_key(key): + (app_fn, script_name) = _wsgi_intercept[key] + app = app_fn() + + return app, script_name + + def connect(self): + """ + Override the connect() function to intercept calls to certain + host/ports. + + If no app at host/port has been registered for interception then + a normal HTTPSConnection is made. + """ + if debuglevel: + sys.stderr.write('connect: %s, %s\n' % (self.host, self.port,)) + + try: + (app, script_name) = self.get_app(self.host, self.port) + if app: + if debuglevel: + sys.stderr.write('INTERCEPTING call to %s:%s\n' % \ + (self.host, self.port,)) + self.sock = wsgi_fake_socket(app, self.host, self.port, + script_name) + else: + HTTPSConnection.connect(self) + + except Exception as e: + if debuglevel: # intercept & print out tracebacks + traceback.print_exc() + raise + diff --git a/wsgi_intercept/httplib2_intercept.py b/wsgi_intercept/httplib2_intercept.py new file mode 100644 index 0000000..4fae09c --- /dev/null +++ b/wsgi_intercept/httplib2_intercept.py @@ -0,0 +1,50 @@ + +"""intercept HTTP connections that use httplib2 + +(see wsgi_intercept/__init__.py for examples) + +""" + +import httplib2 +import wsgi_intercept +from httplib2 import SCHEME_TO_CONNECTION, HTTPConnectionWithTimeout, HTTPSConnectionWithTimeout +import sys + +InterceptorMixin = wsgi_intercept.WSGI_HTTPConnection + +# might make more sense as a decorator + +def connect(self): + """ + Override the connect() function to intercept calls to certain + host/ports. + """ + if wsgi_intercept.debuglevel: + sys.stderr.write('connect: %s, %s\n' % (self.host, self.port,)) + + (app, script_name) = self.get_app(self.host, self.port) + if app: + if wsgi_intercept.debuglevel: + sys.stderr.write('INTERCEPTING call to %s:%s\n' % \ + (self.host, self.port,)) + self.sock = wsgi_intercept.wsgi_fake_socket(app, + self.host, self.port, + script_name) + else: + self._connect() + +class HTTP_WSGIInterceptorWithTimeout(HTTPConnectionWithTimeout, InterceptorMixin): + _connect = httplib2.HTTPConnectionWithTimeout.connect + connect = connect + +class HTTPS_WSGIInterceptorWithTimeout(HTTPSConnectionWithTimeout, InterceptorMixin): + _connect = httplib2.HTTPSConnectionWithTimeout.connect + connect = connect + +def install(): + SCHEME_TO_CONNECTION['http'] = HTTP_WSGIInterceptorWithTimeout + SCHEME_TO_CONNECTION['https'] = HTTPS_WSGIInterceptorWithTimeout + +def uninstall(): + SCHEME_TO_CONNECTION['http'] = HTTPConnectionWithTimeout + SCHEME_TO_CONNECTION['https'] = HTTPSConnectionWithTimeout diff --git a/wsgi_intercept/httplib_intercept.py b/wsgi_intercept/httplib_intercept.py new file mode 100644 index 0000000..774dc6d --- /dev/null +++ b/wsgi_intercept/httplib_intercept.py @@ -0,0 +1,21 @@ + +"""intercept HTTP connections that use httplib + +(see wsgi_intercept/__init__.py for examples) + +""" + +import httplib +import wsgi_intercept +import sys +from httplib import ( + HTTPConnection as OriginalHTTPConnection, + HTTPSConnection as OriginalHTTPSConnection) + +def install(): + httplib.HTTPConnection = wsgi_intercept.WSGI_HTTPConnection + httplib.HTTPSConnection = wsgi_intercept.WSGI_HTTPSConnection + +def uninstall(): + httplib.HTTPConnection = OriginalHTTPConnection + httplib.HTTPSConnection = OriginalHTTPSConnection diff --git a/wsgi_intercept/urllib2_intercept/__init__.py b/wsgi_intercept/urllib2_intercept/__init__.py new file mode 100644 index 0000000..b499dd8 --- /dev/null +++ b/wsgi_intercept/urllib2_intercept/__init__.py @@ -0,0 +1,7 @@ + +"""intercept http requests made using the urllib2 module. + +(see wsgi_intercept/__init__.py for examples) + +""" +from wsgi_urllib2 import * \ No newline at end of file diff --git a/wsgi_intercept/urllib2_intercept/wsgi_urllib2.py b/wsgi_intercept/urllib2_intercept/wsgi_urllib2.py new file mode 100644 index 0000000..dcb6df3 --- /dev/null +++ b/wsgi_intercept/urllib2_intercept/wsgi_urllib2.py @@ -0,0 +1,62 @@ +import sys +from wsgi_intercept import WSGI_HTTPConnection + +import urllib2, httplib +from urllib2 import HTTPHandler +from httplib import HTTP + +# +# ugh, version dependence. +# + +if sys.version_info[:2] == (2, 3): + class WSGI_HTTP(HTTP): + _connection_class = WSGI_HTTPConnection + + class WSGI_HTTPHandler(HTTPHandler): + """ + Override the default HTTPHandler class with one that uses the + WSGI_HTTPConnection class to open HTTP URLs. + """ + def http_open(self, req): + return self.do_open(WSGI_HTTP, req) + + # I'm not implementing HTTPS for 2.3 until someone complains about it! -Kumar + WSGI_HTTPSHandler = None + +else: + class WSGI_HTTPHandler(HTTPHandler): + """ + Override the default HTTPHandler class with one that uses the + WSGI_HTTPConnection class to open HTTP URLs. + """ + def http_open(self, req): + return self.do_open(WSGI_HTTPConnection, req) + + if hasattr(httplib, 'HTTPS'): + # urllib2 does this check as well, I assume it's to see if + # python was compiled with SSL support + from wsgi_intercept import WSGI_HTTPSConnection + from urllib2 import HTTPSHandler + + class WSGI_HTTPSHandler(HTTPSHandler): + """ + Override the default HTTPSHandler class with one that uses the + WSGI_HTTPConnection class to open HTTPS URLs. + """ + def https_open(self, req): + return self.do_open(WSGI_HTTPSConnection, req) + else: + WSGI_HTTPSHandler = None + +def install_opener(): + handlers = [WSGI_HTTPHandler()] + if WSGI_HTTPSHandler is not None: + handlers.append(WSGI_HTTPSHandler()) + opener = urllib2.build_opener(*handlers) + urllib2.install_opener(opener) + + return opener + +def uninstall_opener(): + urllib2.install_opener(None)