Migrated test_urllib2 tests to stdlib directory, thereby removing copy-and-paste code.
This commit is contained in:
		| @@ -25,19 +25,24 @@ import os | ||||
| import errno | ||||
| import unittest | ||||
|  | ||||
| disabled_marker = '-*-*-*-*-*- disabled -*-*-*-*-*-' | ||||
| def exit_disabled(): | ||||
|     sys.exit(disabled_marker) | ||||
|  | ||||
| def exit_unless_twisted(): | ||||
|     from eventlet.api import get_hub | ||||
|     if 'Twisted' not in type(get_hub()).__name__: | ||||
|         exit_disabled() | ||||
|  | ||||
| def exit_unless_25(): | ||||
| def requires_25(func): | ||||
|     if sys.version_info[:2]<(2, 5): | ||||
|         exit_disabled() | ||||
|  | ||||
|         try: | ||||
|             from nose.plugins.skip import SkipTest | ||||
|             def skipme(*a, **k): | ||||
|                 raise SkipTest() | ||||
|             skipme.__name__ == func.__name__ | ||||
|             return skipme | ||||
|         except ImportError: | ||||
|             return lambda *a, **k: None | ||||
|     else: | ||||
|         return func | ||||
|          | ||||
| class LimitedTestCase(unittest.TestCase): | ||||
|  | ||||
|     def setUp(self): | ||||
|   | ||||
							
								
								
									
										15
									
								
								tests/stdlib/test_urllib2.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										15
									
								
								tests/stdlib/test_urllib2.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,15 @@ | ||||
| from test import test_urllib2 | ||||
|  | ||||
| from eventlet.green import socket | ||||
| from eventlet.green import urllib2 | ||||
| from eventlet.green.urllib2 import Request, OpenerDirector | ||||
|  | ||||
| test_urllib2.socket = socket | ||||
| test_urllib2.urllib2 = urllib2 | ||||
| test_urllib2.Request = Request | ||||
| test_urllib2.OpenerDirector = OpenerDirector | ||||
|  | ||||
| from test.test_urllib2 import * | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     test_main() | ||||
							
								
								
									
										24
									
								
								tests/stdlib/test_urllib2_localnet.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										24
									
								
								tests/stdlib/test_urllib2_localnet.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,24 @@ | ||||
| #!/usr/bin/env python | ||||
|  | ||||
| from eventlet.green import threading | ||||
| from eventlet.green import socket | ||||
| from eventlet.green import urllib2 | ||||
| from eventlet.green import BaseHTTPServer | ||||
|  | ||||
| # need to override these modules before import so | ||||
| # that classes inheriting from threading.Thread refer | ||||
| # to the correct parent class | ||||
| import sys | ||||
| sys.modules['threading'] = threading | ||||
| sys.modules['BaseHTTPServer'] = BaseHTTPServer | ||||
|  | ||||
| from test import test_urllib2_localnet | ||||
|  | ||||
| test_urllib2_localnet.socket = socket | ||||
| test_urllib2_localnet.urllib2 = urllib2 | ||||
| test_urllib2_localnet.BaseHTTPServer = BaseHTTPServer | ||||
|  | ||||
| from test.test_urllib2_localnet import * | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     test_main() | ||||
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							| @@ -1,306 +0,0 @@ | ||||
| #!/usr/bin/env python | ||||
|  | ||||
| from tests import exit_unless_25; exit_unless_25() | ||||
|  | ||||
| import sys | ||||
| import urlparse | ||||
| import unittest | ||||
| import hashlib | ||||
| from tests import test_support | ||||
| from eventlet.green import threading | ||||
| from eventlet.green import socket | ||||
| from eventlet.green import urllib2 | ||||
| from eventlet.green import BaseHTTPServer | ||||
|  | ||||
|  | ||||
|  | ||||
| # Loopback http server infrastructure | ||||
|  | ||||
| class LoopbackHttpServer(BaseHTTPServer.HTTPServer): | ||||
|     """HTTP server w/ a few modifications that make it useful for | ||||
|     loopback testing purposes. | ||||
|     """ | ||||
|  | ||||
|     def __init__(self, server_address, RequestHandlerClass): | ||||
|         BaseHTTPServer.HTTPServer.__init__(self, | ||||
|                                            server_address, | ||||
|                                            RequestHandlerClass) | ||||
|  | ||||
|         # Set the timeout of our listening socket really low so | ||||
|         # that we can stop the server easily. | ||||
|         self.socket.settimeout(1.0) | ||||
|  | ||||
|     def get_request(self): | ||||
|         """BaseHTTPServer method, overridden.""" | ||||
|  | ||||
|         request, client_address = self.socket.accept() | ||||
|  | ||||
|         # It's a loopback connection, so setting the timeout | ||||
|         # really low shouldn't affect anything, but should make | ||||
|         # deadlocks less likely to occur. | ||||
|         request.settimeout(10.0) | ||||
|  | ||||
|         return (request, client_address) | ||||
|  | ||||
| class LoopbackHttpServerThread(threading.Thread): | ||||
|     """Stoppable thread that runs a loopback http server.""" | ||||
|  | ||||
|     def __init__(self, port, RequestHandlerClass): | ||||
|         threading.Thread.__init__(self) | ||||
|         self._RequestHandlerClass = RequestHandlerClass | ||||
|         self._stop = False | ||||
|         self._port = port | ||||
|         self._server_address = ('127.0.0.1', self._port) | ||||
|         self.ready = threading.Event() | ||||
|  | ||||
|     def stop(self): | ||||
|         """Stops the webserver if it's currently running.""" | ||||
|  | ||||
|         # Set the stop flag. | ||||
|         self._stop = True | ||||
|  | ||||
|         self.join() | ||||
|  | ||||
|     def run(self): | ||||
|         protocol = "HTTP/1.0" | ||||
|  | ||||
|         self._RequestHandlerClass.protocol_version = protocol | ||||
|         httpd = LoopbackHttpServer(self._server_address, | ||||
|                                    self._RequestHandlerClass) | ||||
|  | ||||
|         sa = httpd.socket.getsockname() | ||||
|         #print "Serving HTTP on", sa[0], "port", sa[1], "..." | ||||
|  | ||||
|         self.ready.set() | ||||
|         while not self._stop: | ||||
|             httpd.handle_request() | ||||
|  | ||||
| # Authentication infrastructure | ||||
|  | ||||
| class DigestAuthHandler: | ||||
|     """Handler for performing digest authentication.""" | ||||
|  | ||||
|     def __init__(self): | ||||
|         self._request_num = 0 | ||||
|         self._nonces = [] | ||||
|         self._users = {} | ||||
|         self._realm_name = "Test Realm" | ||||
|         self._qop = "auth" | ||||
|  | ||||
|     def set_qop(self, qop): | ||||
|         self._qop = qop | ||||
|  | ||||
|     def set_users(self, users): | ||||
|         assert isinstance(users, dict) | ||||
|         self._users = users | ||||
|  | ||||
|     def set_realm(self, realm): | ||||
|         self._realm_name = realm | ||||
|  | ||||
|     def _generate_nonce(self): | ||||
|         self._request_num += 1 | ||||
|         nonce = hashlib.md5(str(self._request_num)).hexdigest() | ||||
|         self._nonces.append(nonce) | ||||
|         return nonce | ||||
|  | ||||
|     def _create_auth_dict(self, auth_str): | ||||
|         first_space_index = auth_str.find(" ") | ||||
|         auth_str = auth_str[first_space_index+1:] | ||||
|  | ||||
|         parts = auth_str.split(",") | ||||
|  | ||||
|         auth_dict = {} | ||||
|         for part in parts: | ||||
|             name, value = part.split("=") | ||||
|             name = name.strip() | ||||
|             if value[0] == '"' and value[-1] == '"': | ||||
|                 value = value[1:-1] | ||||
|             else: | ||||
|                 value = value.strip() | ||||
|             auth_dict[name] = value | ||||
|         return auth_dict | ||||
|  | ||||
|     def _validate_auth(self, auth_dict, password, method, uri): | ||||
|         final_dict = {} | ||||
|         final_dict.update(auth_dict) | ||||
|         final_dict["password"] = password | ||||
|         final_dict["method"] = method | ||||
|         final_dict["uri"] = uri | ||||
|         HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict | ||||
|         HA1 = hashlib.md5(HA1_str).hexdigest() | ||||
|         HA2_str = "%(method)s:%(uri)s" % final_dict | ||||
|         HA2 = hashlib.md5(HA2_str).hexdigest() | ||||
|         final_dict["HA1"] = HA1 | ||||
|         final_dict["HA2"] = HA2 | ||||
|         response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \ | ||||
|                        "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict | ||||
|         response = hashlib.md5(response_str).hexdigest() | ||||
|  | ||||
|         return response == auth_dict["response"] | ||||
|  | ||||
|     def _return_auth_challenge(self, request_handler): | ||||
|         request_handler.send_response(407, "Proxy Authentication Required") | ||||
|         request_handler.send_header("Content-Type", "text/html") | ||||
|         request_handler.send_header( | ||||
|             'Proxy-Authenticate', 'Digest realm="%s", ' | ||||
|             'qop="%s",' | ||||
|             'nonce="%s", ' % \ | ||||
|             (self._realm_name, self._qop, self._generate_nonce())) | ||||
|         # XXX: Not sure if we're supposed to add this next header or | ||||
|         # not. | ||||
|         #request_handler.send_header('Connection', 'close') | ||||
|         request_handler.end_headers() | ||||
|         request_handler.wfile.write("Proxy Authentication Required.") | ||||
|         return False | ||||
|  | ||||
|     def handle_request(self, request_handler): | ||||
|         """Performs digest authentication on the given HTTP request | ||||
|         handler.  Returns True if authentication was successful, False | ||||
|         otherwise. | ||||
|  | ||||
|         If no users have been set, then digest auth is effectively | ||||
|         disabled and this method will always return True. | ||||
|         """ | ||||
|  | ||||
|         if len(self._users) == 0: | ||||
|             return True | ||||
|  | ||||
|         if not request_handler.headers.has_key('Proxy-Authorization'): | ||||
|             return self._return_auth_challenge(request_handler) | ||||
|         else: | ||||
|             auth_dict = self._create_auth_dict( | ||||
|                 request_handler.headers['Proxy-Authorization'] | ||||
|                 ) | ||||
|             if self._users.has_key(auth_dict["username"]): | ||||
|                 password = self._users[ auth_dict["username"] ] | ||||
|             else: | ||||
|                 return self._return_auth_challenge(request_handler) | ||||
|             if not auth_dict.get("nonce") in self._nonces: | ||||
|                 return self._return_auth_challenge(request_handler) | ||||
|             else: | ||||
|                 self._nonces.remove(auth_dict["nonce"]) | ||||
|  | ||||
|             auth_validated = False | ||||
|  | ||||
|             # MSIE uses short_path in its validation, but Python's | ||||
|             # urllib2 uses the full path, so we're going to see if | ||||
|             # either of them works here. | ||||
|  | ||||
|             for path in [request_handler.path, request_handler.short_path]: | ||||
|                 if self._validate_auth(auth_dict, | ||||
|                                        password, | ||||
|                                        request_handler.command, | ||||
|                                        path): | ||||
|                     auth_validated = True | ||||
|  | ||||
|             if not auth_validated: | ||||
|                 return self._return_auth_challenge(request_handler) | ||||
|             return True | ||||
|  | ||||
| # Proxy test infrastructure | ||||
|  | ||||
| class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): | ||||
|     """This is a 'fake proxy' that makes it look like the entire | ||||
|     internet has gone down due to a sudden zombie invasion.  It main | ||||
|     utility is in providing us with authentication support for | ||||
|     testing. | ||||
|     """ | ||||
|  | ||||
|     digest_auth_handler = DigestAuthHandler() | ||||
|  | ||||
|     def log_message(self, format, *args): | ||||
|         # Uncomment the next line for debugging. | ||||
|         #sys.stderr.write(format % args) | ||||
|         pass | ||||
|  | ||||
|     def do_GET(self): | ||||
|         (scm, netloc, path, params, query, fragment) = urlparse.urlparse( | ||||
|             self.path, 'http') | ||||
|         self.short_path = path | ||||
|         if self.digest_auth_handler.handle_request(self): | ||||
|             self.send_response(200, "OK") | ||||
|             self.send_header("Content-Type", "text/html") | ||||
|             self.end_headers() | ||||
|             self.wfile.write("You've reached %s!<BR>" % self.path) | ||||
|             self.wfile.write("Our apologies, but our server is down due to " | ||||
|                               "a sudden zombie invasion.") | ||||
|  | ||||
| # Test cases | ||||
|  | ||||
| class ProxyAuthTests(unittest.TestCase): | ||||
|     URL = "http://www.foo.com" | ||||
|  | ||||
|     PORT = 8080 | ||||
|     USER = "tester" | ||||
|     PASSWD = "test123" | ||||
|     REALM = "TestRealm" | ||||
|  | ||||
|     PROXY_URL = "http://127.0.0.1:%d" % PORT | ||||
|  | ||||
|     def setUp(self): | ||||
|         FakeProxyHandler.digest_auth_handler.set_users({ | ||||
|             self.USER : self.PASSWD | ||||
|             }) | ||||
|         FakeProxyHandler.digest_auth_handler.set_realm(self.REALM) | ||||
|  | ||||
|         self.server = LoopbackHttpServerThread(self.PORT, FakeProxyHandler) | ||||
|         self.server.start() | ||||
|         self.server.ready.wait() | ||||
|  | ||||
|         handler = urllib2.ProxyHandler({"http" : self.PROXY_URL}) | ||||
|         self._digest_auth_handler = urllib2.ProxyDigestAuthHandler() | ||||
|         self.opener = urllib2.build_opener(handler, self._digest_auth_handler) | ||||
|  | ||||
|     def tearDown(self): | ||||
|         self.server.stop() | ||||
|  | ||||
|     def test_proxy_with_bad_password_raises_httperror(self): | ||||
|         self._digest_auth_handler.add_password(self.REALM, self.URL, | ||||
|                                                self.USER, self.PASSWD+"bad") | ||||
|         FakeProxyHandler.digest_auth_handler.set_qop("auth") | ||||
|         self.assertRaises(urllib2.HTTPError, | ||||
|                           self.opener.open, | ||||
|                           self.URL) | ||||
|  | ||||
|     def test_proxy_with_no_password_raises_httperror(self): | ||||
|         FakeProxyHandler.digest_auth_handler.set_qop("auth") | ||||
|         self.assertRaises(urllib2.HTTPError, | ||||
|                           self.opener.open, | ||||
|                           self.URL) | ||||
|  | ||||
|     def test_proxy_qop_auth_works(self): | ||||
|         self._digest_auth_handler.add_password(self.REALM, self.URL, | ||||
|                                                self.USER, self.PASSWD) | ||||
|         FakeProxyHandler.digest_auth_handler.set_qop("auth") | ||||
|         result = self.opener.open(self.URL) | ||||
|         while result.read(): | ||||
|             pass | ||||
|         result.close() | ||||
|  | ||||
|     def test_proxy_qop_auth_int_works_or_throws_urlerror(self): | ||||
|         self._digest_auth_handler.add_password(self.REALM, self.URL, | ||||
|                                                self.USER, self.PASSWD) | ||||
|         FakeProxyHandler.digest_auth_handler.set_qop("auth-int") | ||||
|         try: | ||||
|             result = self.opener.open(self.URL) | ||||
|         except urllib2.URLError: | ||||
|             # It's okay if we don't support auth-int, but we certainly | ||||
|             # shouldn't receive any kind of exception here other than | ||||
|             # a URLError. | ||||
|             result = None | ||||
|         if result: | ||||
|             while result.read(): | ||||
|                 pass | ||||
|             result.close() | ||||
|  | ||||
| def test_main(): | ||||
|     # We will NOT depend on the network resource flag | ||||
|     # (Lib/test/regrtest.py -u network) since all tests here are only | ||||
|     # localhost.  However, if this is a bad rationale, then uncomment | ||||
|     # the next line. | ||||
|     #test_support.requires("network") | ||||
|  | ||||
|     test_support.run_unittest(ProxyAuthTests) | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     test_main() | ||||
		Reference in New Issue
	
	Block a user
	 Ryan Williams
					Ryan Williams