From 9721f6a698ca887445083512d7c0979a292073b8 Mon Sep 17 00:00:00 2001 From: donovan Date: Wed, 24 Dec 2008 12:05:45 -0800 Subject: [PATCH 1/6] remove unused and non-working copy of gthreadless --- eventlet/gthreadless.py | 135 ---------------------------------------- 1 file changed, 135 deletions(-) delete mode 100644 eventlet/gthreadless.py diff --git a/eventlet/gthreadless.py b/eventlet/gthreadless.py deleted file mode 100644 index 9701ab4..0000000 --- a/eventlet/gthreadless.py +++ /dev/null @@ -1,135 +0,0 @@ -from eventlet.support import greenlet -greenlet.main = greenlet.getcurrent() # WTF did greenlet.main go? -from twisted.internet import defer, reactor - -def _desc(g): - if isinstance(g, DebugGreenlet): - if hasattr(g, 'name'): - desc = "<%s %s" % (g.name, hex(id(g))) - else: - desc = " %s" % (_desc(current), _desc(self)) - return super(DebugGreenlet, self).switch(*args, **kwargs) - -def deferredGreenlet(func): - """ - I am a function decorator for functions that call blockOn. The - function I return will call the original function inside of a - greenlet, and return a Deferred. - - TODO: Do a hack so the name of 'replacement' is the name of 'func'. - """ - def replacement(*args, **kwargs): - d = defer.Deferred() - def greenfunc(*args, **kwargs): - try: - d.callback(func(*args, **kwargs)) - except: - d.errback() - g = greenlet.greenlet(greenfunc) - crap = g.switch(*args, **kwargs) - return d - return replacement - -class CalledFromMain(Exception): - pass - -class _IAmAnException(object): - def __init__(self, f): - self.f = f - -def blockOn(d, desc=None): - """ - Use me in non-main greenlets to wait for a Deferred to fire. - """ - g = greenlet.getcurrent() - if g is greenlet.main: - raise CalledFromMain("You cannot call blockOn from the main greenlet.") - - ## Note ## - # Notice that this code catches and ignores GreenletExit. The - # greenlet mechanism sends a GreenletExit at a blocking greenlet if - # there is no chance that the greenlet will be fired by anyone - # else -- that is, no other greenlets have a reference to the one - # that's blocking. - - # This is often the case with blockOn. When someone blocks on a - # Deferred, these callbacks are added to it. When the deferred - # fires, we make the blockOn() call finish -- we resume the - # blocker. At that point, the Deferred chain is irrelevant; it - # makes no sense for any other callbacks to be called. The - # Deferred, then, will likely be garbage collected and thus all - # references to our greenlet will be lost -- and thus it will have - # GreenletExit fired. - - def cb(r): - try: - # This callback might be fired immediately when added - # and switching to the current greenlet seems to do nothing - # (ie. we will never actually return to the function we called - # blockOn from), so we make the call happen later in the main greenlet - # instead, if the current greenlet is the same as the one we are swithcing - # to. - - if g == greenlet.getcurrent(): - reactor.callLater(0, g.switch, r) - else: - g.switch(r) - except greenlet.GreenletExit: - pass - def eb(f): - try: - g.switch(_IAmAnException(f)) - except greenlet.GreenletExit: - pass - - d.addCallbacks(cb, eb) - - x = g.parent.switch() - if isinstance(x, _IAmAnException): - x.f.raiseException() - return x - - -class GreenletWrapper(object): - """Wrap an object which presents an asynchronous interface (via Deferreds). - - The wrapped object will present the same interface, but all methods will - return results, rather than Deferreds. - - When a Deferred would otherwise be returned, a greenlet is created and then - control is switched back to the main greenlet. When the Deferred fires, - control is switched back to the created greenlet and execution resumes with - the result. - """ - - def __init__(self, wrappee): - self.wrappee = wrappee - - def __getattribute__(self, name): - wrappee = super(GreenletWrapper, self).__getattribute__('wrappee') - original = getattr(wrappee, name) - if callable(original): - def wrapper(*a, **kw): - result = original(*a, **kw) - if isinstance(result, defer.Deferred): - return blockOn(result) - return result - return wrapper - return original - From 7e65420bb367dff3c6f01f0e54438e0d2393a5ca Mon Sep 17 00:00:00 2001 From: donovan Date: Wed, 24 Dec 2008 12:52:50 -0800 Subject: [PATCH 2/6] remove channel. use coros.queue instead. --- eventlet/channel.py | 102 -------------------------------------------- 1 file changed, 102 deletions(-) delete mode 100644 eventlet/channel.py diff --git a/eventlet/channel.py b/eventlet/channel.py deleted file mode 100644 index 32be5ae..0000000 --- a/eventlet/channel.py +++ /dev/null @@ -1,102 +0,0 @@ -"""\ -@file channel.py -@author Bob Ippolito - -Copyright (c) 2005-2006, Bob Ippolito -Copyright (c) 2007, Linden Research, Inc. -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. -""" - -import collections - -from eventlet import api -from eventlet.support import greenlet - -__all__ = ['channel'] - -class channel(object): - """A channel is a control flow primitive for co-routines. It is a - "thread-like" queue for controlling flow between two (or more) co-routines. - The state model is: - - * If one co-routine calls send(), it is unscheduled until another - co-routine calls receive(). - * If one co-rounte calls receive(), it is unscheduled until another - co-routine calls send(). - * Once a paired send()/receive() have been called, both co-routeines - are rescheduled. - - This is similar to: http://stackless.com/wiki/Channels - """ - balance = 0 - - def _tasklet_loop(self): - deque = self.deque = collections.deque() - hub = api.get_hub() - current = greenlet.getcurrent() - def switch(g, value=None, exc=None): - if exc is None: - return g.switch(value) - else: - return g.throw(exc) - direction, caller, args = switch(current.parent or current) - try: - while True: - if direction == -1: - # waiting to receive - if self.balance > 0: - sender, args = deque.popleft() - hub.schedule_call(0, switch, sender) - hub.schedule_call(0, switch, caller, *args) - else: - deque.append(caller) - else: - # waiting to send - if self.balance < 0: - receiver = deque.popleft() - hub.schedule_call(0, switch, receiver, *args) - hub.schedule_call(0, switch, caller) - else: - deque.append((caller, args)) - self.balance += direction - direction, caller, args = hub.switch() - finally: - deque.clear() - del self.deque - self.balance = 0 - - def _send_tasklet(self, *args): - try: - t = self._tasklet - except AttributeError: - t = self._tasklet = greenlet.greenlet(self._tasklet_loop) - t.switch() - if args: - return t.switch((1, greenlet.getcurrent(), args)) - else: - return t.switch((-1, greenlet.getcurrent(), args)) - - def receive(self): - return self._send_tasklet() - - def send(self, value): - return self._send_tasklet(value) - - def send_exception(self, exc): - return self._send_tasklet(None, exc) From 277698b4d55de6f2e6f65d26e1c70ffd324c15c3 Mon Sep 17 00:00:00 2001 From: donovan Date: Wed, 24 Dec 2008 13:01:50 -0800 Subject: [PATCH 3/6] Remove httpd. Use eventlet.wsgi instead. --- eventlet/httpd.py | 605 ---------------------------------------------- 1 file changed, 605 deletions(-) delete mode 100644 eventlet/httpd.py diff --git a/eventlet/httpd.py b/eventlet/httpd.py deleted file mode 100644 index 18648b9..0000000 --- a/eventlet/httpd.py +++ /dev/null @@ -1,605 +0,0 @@ -"""\ -@file httpd.py -@author Donovan Preston - -Copyright (c) 2005-2006, Donovan Preston -Copyright (c) 2007, Linden Research, Inc. -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. -""" - -import cgi -import errno -from eventlet.green import socket -import sys -import time -from eventlet.green import urllib -import traceback -from eventlet.green import BaseHTTPServer - -try: - from cStringIO import StringIO -except ImportError: - from StringIO import StringIO - -from eventlet import api -from eventlet import coros - - -DEFAULT_MAX_HTTP_VERSION = 'HTTP/1.1' - -USE_ACCESS_LOG = True - - -CONNECTION_CLOSED = (errno.EPIPE, errno.ECONNRESET) - - -class ErrorResponse(Exception): - _responses = BaseHTTPServer.BaseHTTPRequestHandler.responses - - def __init__(self, code, reason_phrase=None, headers=None, body=None): - Exception.__init__(self, reason_phrase) - self.code = code - if reason_phrase is None: - self.reason = self._responses[code][0] - else: - self.reason = reason_phrase - self.headers = headers - if body is None: - self.body = self._responses[code][1] - else: - self.body = body - - -class Request(object): - _method = None - _path = None - _responsecode = 200 - _reason_phrase = None - _request_started = False - _chunked = False - _producer_adapters = {} - depth = 0 - def __init__(self, protocol, method, path, headers): - self.context = {} - self.request_start_time = time.time() - self.site = protocol.server.site - self.protocol = protocol - self._method = method - if '?' in path: - self._path, self._query = path.split('?', 1) - self._query = self._query.replace('&', '&') - else: - self._path, self._query = path, None - self._incoming_headers = headers - self._outgoing_headers = dict() - - def response(self, code, reason_phrase=None, headers=None, body=None): - """Change the response code. This will not be sent until some - data is written; last call to this method wins. Default is - 200 if this is not called. - """ - self._responsecode = code - self._reason_phrase = reason_phrase - self.protocol.set_response_code(self, code, reason_phrase) - if headers is not None: - try: - headers = headers.iteritems() - except AttributeError: - pass - for key, value in headers: - self.set_header(key, value) - if body is not None: - self.write(body) - - def is_okay(self): - return 200 <= self._responsecode <= 299 - - def full_url(self): - path = self.path() - query = self.query() - if query: - path = path + '?' + query - - via = self.get_header('via', '') - if via.strip(): - next_part = iter(via.split()).next - - received_protocol = next_part() - received_by = next_part() - if received_by.endswith(','): - received_by = received_by[:-1] - else: - comment = '' - while not comment.endswith(','): - try: - comment += next_part() - except StopIteration: - comment += ',' - break - comment = comment[:-1] - else: - received_by = self.get_header('host') - - return '%s://%s%s' % (self.request_protocol(), received_by, path) - - def begin_response(self, length="-"): - """Begin the response, and return the initial response text - """ - self._request_started = True - request_time = time.time() - self.request_start_time - - code = self._responsecode - proto = self.protocol - - if USE_ACCESS_LOG: - proto.server.write_access_log_line( - proto.client_address[0], - time.strftime("%d/%b/%Y %H:%M:%S"), - proto.requestline, - code, - length, - request_time) - - if self._reason_phrase is not None: - message = self._reason_phrase.split("\n")[0] - elif code in proto.responses: - message = proto.responses[code][0] - else: - message = '' - if proto.request_version == 'HTTP/0.9': - return [] - - response_lines = proto.generate_status_line() - - if not self._outgoing_headers.has_key('connection'): - con = self.get_header('connection') - if con is None and proto.request_version == 'HTTP/1.0': - con = 'close' - if con is not None: - self.set_header('connection', con) - - for key, value in self._outgoing_headers.items(): - key = '-'.join([x.capitalize() for x in key.split('-')]) - response_lines.append("%s: %s" % (key, value)) - - response_lines.append("") - return response_lines - - def write(self, obj): - """Writes an arbitrary object to the response, using - the sitemap's adapt method to convert it to bytes. - """ - if isinstance(obj, str): - self._write_bytes(obj) - elif isinstance(obj, unicode): - # use utf8 encoding for now, *TODO support charset negotiation - # Content-Type: text/html; charset=utf-8 - ctype = self._outgoing_headers.get('content-type', 'text/html') - ctype = ctype + '; charset=utf-8' - self._outgoing_headers['content-type'] = ctype - self._write_bytes(obj.encode('utf8')) - else: - self.site.adapt(obj, self) - - def _write_bytes(self, data): - """Write all the data of the response. - Can be called just once. - """ - if self._request_started: - print "Request has already written a response:" - traceback.print_stack() - return - - self._outgoing_headers['content-length'] = len(data) - - response_lines = self.begin_response(len(data)) - response_lines.append(data) - self.protocol.wfile.write("\r\n".join(response_lines)) - if hasattr(self.protocol.wfile, 'flush'): - self.protocol.wfile.flush() - - def method(self): - return self._method - - def path(self): - return self._path - - def path_segments(self): - return [urllib.unquote_plus(x) for x in self._path.split('/')[1:]] - - def query(self): - return self._query - - def uri(self): - if self._query: - return '%s?%s' % ( - self._path, self._query) - return self._path - - def get_headers(self): - return self._incoming_headers - - def get_header(self, header_name, default=None): - return self.get_headers().get(header_name.lower(), default) - - def get_query_pairs(self): - if not hasattr(self, '_split_query'): - if self._query is None: - self._split_query = () - else: - spl = self._query.split('&') - spl = [x.split('=', 1) for x in spl if x] - self._split_query = [] - for query in spl: - if len(query) == 1: - key = query[0] - value = '' - else: - key, value = query - self._split_query.append((urllib.unquote_plus(key), urllib.unquote_plus(value))) - - return self._split_query - - def get_queries_generator(self, name): - """Generate all query parameters matching the given name. - """ - for key, value in self.get_query_pairs(): - if key == name or not name: - yield value - - get_queries = lambda self, name: list(self.get_queries_generator) - - def get_query(self, name, default=None): - try: - return self.get_queries_generator(name).next() - except StopIteration: - return default - - def get_arg_list(self, name): - return self.get_field_storage().getlist(name) - - def get_arg(self, name, default=None): - return self.get_field_storage().getfirst(name, default) - - def get_field_storage(self): - if not hasattr(self, '_field_storage'): - if self.method() == 'GET': - data = '' - if self._query: - data = self._query - else: - data = self.read_body() - fl = StringIO(data) - ## Allow our resource to provide the FieldStorage instance for - ## customization purposes. - headers = self.get_headers() - environ = dict( - REQUEST_METHOD='POST', - QUERY_STRING=self._query or '') - - self._field_storage = cgi.FieldStorage(fl, headers, environ=environ) - - return self._field_storage - - def set_header(self, key, value): - if key.lower() == 'connection' and value.lower() == 'close': - self.protocol.close_connection = 1 - self._outgoing_headers[key.lower()] = value - __setitem__ = set_header - - def get_outgoing_header(self, key): - return self._outgoing_headers[key.lower()] - - def has_outgoing_header(self, key): - return self._outgoing_headers.has_key(key.lower()) - - def socket(self): - return self.protocol.socket - - def error(self, response=None, body=None, log_traceback=True): - if log_traceback: - traceback.print_exc(file=self.log) - if response is None: - response = 500 - if body is None: - typ, val, tb = sys.exc_info() - body = dict(type=str(typ), error=True, reason=str(val)) - self.response(response) - if(type(body) is str and not self.response_written()): - self.write(body) - return - try: - produce(body, self) - except Exception, e: - traceback.print_exc(file=self.log) - if not self.response_written(): - self.write('Internal Server Error') - - def not_found(self): - self.error(404, 'Not Found\n', log_traceback=False) - - def raw_body(self): - if not hasattr(self, '_cached_body'): - self.read_body() - return self._cached_body - - def read_body(self): - """ Returns the string body that was read off the request, or - the empty string if there was no request body. - - Requires a content-length header. Caches the body so multiple - calls to read_body() are free. - """ - if not hasattr(self, '_cached_body'): - length = self.get_header('content-length') - if length: - length = int(length) - if length: - self._cached_body = self.protocol.rfile.read(length) - else: - self._cached_body = '' - return self._cached_body - - def parsed_body(self): - """ Returns the parsed version of the body, using the - content-type header to select from the parsers on the site - object. - - If no parser is found, returns the string body from - read_body(). Caches the parsed body so multiple calls to - parsed_body() are free. - """ - if not hasattr(self, '_cached_parsed_body'): - body = self.read_body() - if hasattr(self.site, 'parsers'): - ct = self.get_header('content-type') - parser = self.site.parsers.get(ct) - - if parser is not None: - body = parser(body) - else: - ex = ValueError("Could not find parser for content-type: %s" % ct) - ex.body = body - raise ex - self._cached_parsed_body = body - return self._cached_parsed_body - - def override_body(self, body): - if not hasattr(self, '_cached_parsed_body'): - self.read_body() ## Read and discard body - self._cached_parsed_body = body - - def response_written(self): - ## TODO change badly named variable - return self._request_started - - def request_version(self): - return self.protocol.request_version - - def request_protocol(self): - if self.protocol.is_secure: - return "https" - return "http" - - def server_address(self): - return self.protocol.server.address - - def __repr__(self): - return "" % ( - getattr(self, '_method'), getattr(self, '_path')) - -DEFAULT_TIMEOUT = 300 - -# This value was chosen because apache 2 has a default limit of 8190. -# I believe that slightly smaller number is because apache does not -# count the \r\n. -MAX_REQUEST_LINE = 8192 - -class Timeout(RuntimeError): - pass - -class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler): - def __init__(self, request, client_address, server): - self.rfile = self.wfile = request.makeGreenFile() - self.is_secure = request.is_secure - request.close() # close this now so that when rfile and wfile are closed, the socket gets closed - self.client_address = client_address - self.server = server - self.set_response_code(None, 200, None) - self.protocol_version = server.max_http_version - - def close(self): - self.rfile.close() - self.wfile.close() - - def set_response_code(self, request, code, message): - self._code = code - if message is not None: - self._message = message.split("\n")[0] - elif code in self.responses: - self._message = self.responses[code][0] - else: - self._message = '' - - def generate_status_line(self): - return [ - "%s %d %s" % ( - self.protocol_version, self._code, self._message)] - - def write_bad_request(self, status, reason): - self.set_response_code(self, status, reason) - self.wfile.write(''.join(self.generate_status_line())) - self.wfile.write('\r\nServer: %s\r\n' % self.version_string()) - self.wfile.write('Date: %s\r\n' % self.date_time_string()) - self.wfile.write('Content-Length: 0\r\n\r\n') - - def handle(self): - self.close_connection = 0 - - timeout = DEFAULT_TIMEOUT - while not self.close_connection: - if timeout == 0: - break - cancel = api.exc_after(timeout, Timeout) - try: - self.raw_requestline = self.rfile.readline(MAX_REQUEST_LINE) - if self.raw_requestline is not None: - if len(self.raw_requestline) == MAX_REQUEST_LINE: - # Someone sent a request line which is too - # large. Be helpful and tell them. - self.write_bad_request(414, 'Request-URI Too Long') - self.close_connection = True - continue - except socket.error, e: - if e[0] in CONNECTION_CLOSED: - self.close_connection = True - cancel.cancel() - continue - except Timeout: - self.close_connection = True - continue - except Exception, e: - try: - if e[0][0][0].startswith('SSL'): - print "SSL Error:", e[0][0] - self.close_connection = True - cancel.cancel() - continue - except Exception, f: - print "Exception in ssl test:",f - pass - raise e - cancel.cancel() - - if not self.raw_requestline or not self.parse_request(): - self.close_connection = True - continue - - self.set_response_code(None, 200, None) - request = Request(self, self.command, self.path, self.headers) - request.set_header('Server', self.version_string()) - request.set_header('Date', self.date_time_string()) - try: - timeout = int(request.get_header('keep-alive', timeout)) - except TypeError, ValueError: - pass - - try: - try: - try: - self.server.site.handle_request(request) - except ErrorResponse, err: - request.response(code=err.code, - reason_phrase=err.reason, - headers=err.headers, - body=err.body) - finally: - # clean up any timers that might have been left around by the handling code - api.get_hub().cancel_timers(api.getcurrent()) - - # throw an exception if it failed to write a body - if not request.response_written(): - raise NotImplementedError("Handler failed to write response to request: %s" % request) - - if not hasattr(self, '_cached_body'): - try: - request.read_body() ## read & discard body - except: - pass - - except socket.error, e: - # Broken pipe, connection reset by peer - if e[0] in CONNECTION_CLOSED: - #print "Remote host closed connection before response could be sent" - pass - else: - raise - except Exception, e: - self.server.log_message("Exception caught in HttpRequest.handle():\n") - self.server.log_exception(*sys.exc_info()) - if not request.response_written(): - request.response(500) - request.write('Internal Server Error') - self.close() - raise e # can't do a plain raise since exc_info might have been cleared - self.close() - - -class Server(BaseHTTPServer.HTTPServer): - def __init__(self, socket, address, site, log, max_http_version=DEFAULT_MAX_HTTP_VERSION): - self.socket = socket - self.address = address - self.site = site - self.max_http_version = max_http_version - if log: - self.log = log - if hasattr(log, 'info'): - log.write = log.info - else: - self.log = self - - def write(self, something): - sys.stdout.write('%s' % (something, )); sys.stdout.flush() - - def log_message(self, message): - self.log.write(message) - - def log_exception(self, type, value, tb): - self.log.write(''.join(traceback.format_exception(type, value, tb))) - - def write_access_log_line(self, *args): - """Write a line to the access.log. Arguments: - client_address, date_time, requestline, code, size, request_time - """ - self.log.write( - '%s - - [%s] "%s" %s %s %.6f\n' % args) - - -def server(sock, site, log=None, max_size=512, serv=None, max_http_version=DEFAULT_MAX_HTTP_VERSION): - pool = coros.CoroutinePool(max_size=max_size) - if serv is None: - serv = Server(sock, sock.getsockname(), site, log, max_http_version=max_http_version) - try: - serv.log.write("httpd starting up on %s\n" % (sock.getsockname(), )) - while True: - try: - new_sock, address = sock.accept() - proto = HttpProtocol(new_sock, address, serv) - pool.execute_async(proto.handle) - api.sleep(0) # sleep to allow other coros to run - except KeyboardInterrupt: - api.get_hub().remove_descriptor(sock.fileno()) - serv.log.write("httpd exiting\n") - break - finally: - try: - sock.close() - except socket.error: - pass - - -if __name__ == '__main__': - class TestSite(object): - def handle_request(self, req): - req.write('hello') - - server( - api.tcp_listener(('127.0.0.1', 8080)), - TestSite()) - From 6ab1ce5247c35e13f7bb3a63f767a738aa6b94a2 Mon Sep 17 00:00:00 2001 From: donovan Date: Wed, 24 Dec 2008 13:06:50 -0800 Subject: [PATCH 4/6] remove unused bench file. wtf --- eventlet/bench.py | 40 ---------------------------------------- 1 file changed, 40 deletions(-) delete mode 100644 eventlet/bench.py diff --git a/eventlet/bench.py b/eventlet/bench.py deleted file mode 100644 index 081b10a..0000000 --- a/eventlet/bench.py +++ /dev/null @@ -1,40 +0,0 @@ -import collections, time, Queue - -qt = 10000 - -l1 = collections.deque() -l2 = [] -l3 = Queue.Queue() - -start = time.time() -for i in range(1,qt): - l1.append(i) - -for i in range(1,qt): - l1.popleft() - -mid = time.time() - -for i in range(1,qt): - l2.append(i) - -for i in range(1,qt): - l2.pop(0) - -mid2 = time.time() - -for i in range(1,qt): - l3.put_nowait(i) - -for i in range(1,qt): - l3.get_nowait() - -end = time.time() - -dtime = mid - start -ltime = mid2 - mid -qtime = end - mid2 - -print "deque:", dtime -print " list:", ltime -print "queue:", qtime \ No newline at end of file From 0aa029f44df41cf06f60881886f8758d91a642de Mon Sep 17 00:00:00 2001 From: donovan Date: Wed, 24 Dec 2008 14:44:51 -0800 Subject: [PATCH 5/6] remove redundant selecthub; selects is the right module --- eventlet/hubs/selecthub.py | 60 -------------------------------------- 1 file changed, 60 deletions(-) delete mode 100644 eventlet/hubs/selecthub.py diff --git a/eventlet/hubs/selecthub.py b/eventlet/hubs/selecthub.py deleted file mode 100644 index fac226e..0000000 --- a/eventlet/hubs/selecthub.py +++ /dev/null @@ -1,60 +0,0 @@ -"""\ -@file selecthub.py - -Copyright (c) 2005-2006, Bob Ippolito -Copyright (c) 2007, Linden Research, Inc. -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. -""" - -import sys -import select -import errno -import time - -from eventlet.hubs import hub -from eventlet.support import greenlet - -class Hub(hub.BaseHub): - def wait(self, seconds=None): - readers = self.readers - writers = self.writers - excs = self.excs - if not readers and not writers and not excs: - if seconds: - time.sleep(seconds) - return - try: - r, w, ig = select.select(readers.keys(), writers.keys(), [], seconds) - except select.error, e: - if e.args[0] == errno.EINTR: - return - raise - SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS - for observed, events in ((readers, r), (writers, w)): - #print "events", r, " ", w - for fileno in events: - try: - cb = observed.get(fileno) - #print "cb", cb, " ", observed - if cb is not None: - cb(fileno) - except SYSTEM_EXCEPTIONS: - raise - except: - self.squelch_exception(fileno, sys.exc_info()) From 01670ce0724c303be35f9494074a84149ccf8a22 Mon Sep 17 00:00:00 2001 From: donovan Date: Mon, 29 Dec 2008 16:10:34 -0800 Subject: [PATCH 6/6] Rename the eventlet.support.greenlet module to greenlets to avoid clashing with the pypi greenlet distribution; support the pypi greenlet distribution; port httpc_test to use wsgi instead of httpd; add channel back in because I couldn't get pool to work properly with queue yet --- .hgignore | 4 + eventlet/api.py | 8 +- eventlet/channel.py | 102 +++++++++++++++++ eventlet/coros.py | 2 + eventlet/green/thread.py | 2 +- eventlet/greenlib.py | 2 +- eventlet/hubs/hub.py | 2 +- eventlet/hubs/libev.py | 2 +- eventlet/hubs/libevent.py | 2 +- eventlet/hubs/selects.py | 2 +- eventlet/pools.py | 49 -------- eventlet/support/greenlets.py | 22 ++++ eventlet/twistedutil/join_reactor.py | 2 +- eventlet/util.py | 2 +- greentest/coros_test.py | 1 + greentest/httpc_test.py | 163 ++++++++++++++------------- greentest/pools_test.py | 56 +++------ greentest/processes_test.py | 1 - greentest/saranwrap_test.py | 6 +- 19 files changed, 252 insertions(+), 178 deletions(-) create mode 100644 eventlet/channel.py create mode 100644 eventlet/support/greenlets.py diff --git a/.hgignore b/.hgignore index 1c118cd..5ece4c6 100644 --- a/.hgignore +++ b/.hgignore @@ -6,3 +6,7 @@ dist eventlet.egg-info build htmlreports +*.esproj +.DS_Store +results.*.db + diff --git a/eventlet/api.py b/eventlet/api.py index 2bbd09c..4ae98fd 100644 --- a/eventlet/api.py +++ b/eventlet/api.py @@ -30,7 +30,7 @@ import linecache import inspect import traceback -from eventlet.support import greenlet +from eventlet.support import greenlets as greenlet from eventlet import tls __all__ = [ @@ -40,6 +40,12 @@ __all__ = [ 'unspew', 'use_hub', 'with_timeout', 'timeout'] +def switch(coro, result=None, exc=None): + if exc is not None: + return coro.throw(exc) + return coro.switch(result) + + class TimeoutError(Exception): """Exception raised if an asynchronous operation times out""" pass diff --git a/eventlet/channel.py b/eventlet/channel.py new file mode 100644 index 0000000..963088f --- /dev/null +++ b/eventlet/channel.py @@ -0,0 +1,102 @@ +"""\ +@file channel.py +@author Bob Ippolito + +Copyright (c) 2005-2006, Bob Ippolito +Copyright (c) 2007, Linden Research, Inc. +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + +import collections + +from eventlet import api +from eventlet.support import greenlets as greenlet + +__all__ = ['channel'] + +class channel(object): + """A channel is a control flow primitive for co-routines. It is a + "thread-like" queue for controlling flow between two (or more) co-routines. + The state model is: + + * If one co-routine calls send(), it is unscheduled until another + co-routine calls receive(). + * If one co-rounte calls receive(), it is unscheduled until another + co-routine calls send(). + * Once a paired send()/receive() have been called, both co-routeines + are rescheduled. + + This is similar to: http://stackless.com/wiki/Channels + """ + balance = 0 + + def _tasklet_loop(self): + deque = self.deque = collections.deque() + hub = api.get_hub() + current = greenlet.getcurrent() + def switch(g, value=None, exc=None): + if exc is None: + return g.switch(value) + else: + return g.throw(exc) + direction, caller, args = switch(current.parent or current) + try: + while True: + if direction == -1: + # waiting to receive + if self.balance > 0: + sender, args = deque.popleft() + hub.schedule_call(0, switch, sender) + hub.schedule_call(0, switch, caller, *args) + else: + deque.append(caller) + else: + # waiting to send + if self.balance < 0: + receiver = deque.popleft() + hub.schedule_call(0, switch, receiver, *args) + hub.schedule_call(0, switch, caller) + else: + deque.append((caller, args)) + self.balance += direction + direction, caller, args = hub.switch() + finally: + deque.clear() + del self.deque + self.balance = 0 + + def _send_tasklet(self, *args): + try: + t = self._tasklet + except AttributeError: + t = self._tasklet = greenlet.greenlet(self._tasklet_loop) + t.switch() + if args: + return t.switch((1, greenlet.getcurrent(), args)) + else: + return t.switch((-1, greenlet.getcurrent(), args)) + + def receive(self): + return self._send_tasklet() + + def send(self, value): + return self._send_tasklet(value) + + def send_exception(self, exc): + return self._send_tasklet(None, exc) diff --git a/eventlet/coros.py b/eventlet/coros.py index b15f8fd..a8ba5b0 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -558,6 +558,7 @@ class Semaphore(object): if self.counter<=0: self._waiters[api.getcurrent()] = None try: + print "hub switch" api.get_hub().switch() finally: self._waiters.pop(api.getcurrent(), None) @@ -618,6 +619,7 @@ class BoundedSemaphore(object): api.get_hub().schedule_call(0, self._do_unlock) self._acquire_waiters[api.getcurrent()] = None try: + print "HUB switch" api.get_hub().switch() finally: self._acquire_waiters.pop(api.getcurrent(), None) diff --git a/eventlet/green/thread.py b/eventlet/green/thread.py index cd7176a..6ec7d03 100644 --- a/eventlet/green/thread.py +++ b/eventlet/green/thread.py @@ -1,7 +1,7 @@ """implements standard module 'thread' with greenlets""" from __future__ import absolute_import import thread as thread_module -from eventlet.support import greenlet +from eventlet.support import greenlets as greenlet from eventlet.api import spawn from eventlet.coros import semaphore as LockType diff --git a/eventlet/greenlib.py b/eventlet/greenlib.py index e293112..326521f 100644 --- a/eventlet/greenlib.py +++ b/eventlet/greenlib.py @@ -25,7 +25,7 @@ THE SOFTWARE. import sys import itertools -from eventlet.support import greenlet +from eventlet.support import greenlets as greenlet from eventlet import tls diff --git a/eventlet/hubs/hub.py b/eventlet/hubs/hub.py index d287554..f900b51 100644 --- a/eventlet/hubs/hub.py +++ b/eventlet/hubs/hub.py @@ -29,7 +29,7 @@ import errno import traceback import time -from eventlet.support import greenlet +from eventlet.support import greenlets as greenlet from eventlet.timer import Timer _g_debug = True diff --git a/eventlet/hubs/libev.py b/eventlet/hubs/libev.py index d839ecd..02a8b09 100644 --- a/eventlet/hubs/libev.py +++ b/eventlet/hubs/libev.py @@ -32,7 +32,7 @@ import time from eventlet.timer import Timer from eventlet.hubs import hub -from eventlet.support import greenlet +from eventlet.support import greenlets as greenlet # XXX for debugging only #raise ImportError() diff --git a/eventlet/hubs/libevent.py b/eventlet/hubs/libevent.py index 1cf5b23..fab7d03 100644 --- a/eventlet/hubs/libevent.py +++ b/eventlet/hubs/libevent.py @@ -33,7 +33,7 @@ from eventlet import greenlib from eventlet.timer import Timer from eventlet.hubs import hub -from eventlet.support import greenlet +from eventlet.support import greenlets as greenlet # XXX for debugging only #raise ImportError() diff --git a/eventlet/hubs/selects.py b/eventlet/hubs/selects.py index f8e2297..06e2b36 100644 --- a/eventlet/hubs/selects.py +++ b/eventlet/hubs/selects.py @@ -29,7 +29,7 @@ import time from eventlet.hubs import hub -from eventlet.support import greenlet +from eventlet.support import greenlets as greenlet class Hub(hub.BaseHub): def wait(self, seconds=None): diff --git a/eventlet/pools.py b/eventlet/pools.py index a05aae7..b226206 100644 --- a/eventlet/pools.py +++ b/eventlet/pools.py @@ -29,17 +29,6 @@ import socket from eventlet import api from eventlet import channel -class FanFailed(RuntimeError): - pass - - -class SomeFailed(FanFailed): - pass - - -class AllFailed(FanFailed): - pass - class Pool(object): """ @@ -106,44 +95,6 @@ class Pool(object): """ raise NotImplementedError("Implement in subclass") - def fan(self, block, input_list): - chan = channel.channel() - results = [] - exceptional_results = 0 - for index, input_item in enumerate(input_list): - pool_item = self.get() - - ## Fan out - api.spawn( - self._invoke, block, pool_item, input_item, index, chan) - - ## Fan back in - for i in range(len(input_list)): - ## Wait for all guys to send to the queue - index, value = chan.receive() - if isinstance(value, Exception): - exceptional_results += 1 - results.append((index, value)) - - results.sort() - results = [value for index, value in results] - - if exceptional_results: - if exceptional_results == len(results): - raise AllFailed(results) - raise SomeFailed(results) - return results - - def _invoke(self, block, pool_item, input_item, index, chan): - try: - result = block(pool_item, input_item) - except Exception, e: - self.put(pool_item) - chan.send((index, e)) - return - self.put(pool_item) - chan.send((index, result)) - class Token(object): pass diff --git a/eventlet/support/greenlets.py b/eventlet/support/greenlets.py new file mode 100644 index 0000000..0423542 --- /dev/null +++ b/eventlet/support/greenlets.py @@ -0,0 +1,22 @@ + +try: + import greenlet + getcurrent = greenlet.getcurrent + GreenletExit = greenlet.GreenletExit + greenlet = greenlet.greenlet +except ImportError, e: + print e + try: + from py.magic import greenlet + getcurrent = greenlet.getcurrent + GreenletExit = greenlet.GreenletExit + except ImportError: + try: + from stackless import greenlet + getcurrent = greenlet.getcurrent + GreenletExit = greenlet.GreenletExit + except ImportError: + try: + from support.stacklesss import greenlet, getcurrent, GreenletExit + except ImportError, e: + raise ImportError("Unable to find an implementation of greenlet.") diff --git a/eventlet/twistedutil/join_reactor.py b/eventlet/twistedutil/join_reactor.py index e658fa8..f1f9470 100644 --- a/eventlet/twistedutil/join_reactor.py +++ b/eventlet/twistedutil/join_reactor.py @@ -5,7 +5,7 @@ yourself. """ from eventlet.hubs.twistedr import BaseTwistedHub from eventlet.api import use_hub, _threadlocal -from eventlet.support import greenlet +from eventlet.support import greenlets as greenlet use_hub(BaseTwistedHub) assert not hasattr(_threadlocal, 'hub') diff --git a/eventlet/util.py b/eventlet/util.py index 9583e25..ed5be69 100644 --- a/eventlet/util.py +++ b/eventlet/util.py @@ -48,7 +48,7 @@ except ImportError: def g_log(*args): import sys - from eventlet.support import greenlet + from eventlet.support import greenlets as greenlet from eventlet.greenlib import greenlet_id g_id = greenlet_id() if g_id is None: diff --git a/greentest/coros_test.py b/greentest/coros_test.py index bf509d9..5f10cea 100644 --- a/greentest/coros_test.py +++ b/greentest/coros_test.py @@ -213,6 +213,7 @@ class TestCoroutinePool(tests.TestCase): for x in range(6): pool.execute(lambda n: n, x) for y in range(6): + print "wait", y pool.wait() def test_track_slow_event(self): diff --git a/greentest/httpc_test.py b/greentest/httpc_test.py index c43b3fa..fd88a74 100644 --- a/greentest/httpc_test.py +++ b/greentest/httpc_test.py @@ -22,11 +22,14 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ +import cgi + from eventlet import api from eventlet import httpc -from eventlet import httpd from eventlet import processes from eventlet import util +from eventlet import wsgi + import time try: from cStringIO import StringIO @@ -41,64 +44,74 @@ class Site(object): def __init__(self): self.stuff = {'hello': 'hello world'} - def adapt(self, obj, req): - req.write(str(obj)) + def __call__(self, env, start_response): + return getattr(self, 'handle_%s' % env['REQUEST_METHOD'].lower())(env, start_response) + + +def _get_query_pairs(env): + parsed = cgi.parse_qs(env['QUERY_STRING']) + for key, values in parsed.items(): + for val in values: + yield key, val + + +def get_query_pairs(env): + return list(_get_query_pairs(env)) + - def handle_request(self, req): - return getattr(self, 'handle_%s' % req.method().lower())(req) class BasicSite(Site): - def handle_get(self, req): - req.set_header('x-get', 'hello') + def handle_get(self, env, start_response): + headers = [('x-get', 'hello'), ('Content-type', 'text/plain')] resp = StringIO() - path = req.path().lstrip('/') + path = env['PATH_INFO'].lstrip('/') try: resp.write(self.stuff[path]) except KeyError: - req.response(404, body='Not found') - return - for k,v in req.get_query_pairs(): + start_response("404 Not Found", headers) + return ["Not Found"] + for k,v in get_query_pairs(env): resp.write(k + '=' + v + '\n') - req.write(resp.getvalue()) + start_response("200 OK", headers) + return [resp.getvalue()] - def handle_head(self, req): - req.set_header('x-head', 'hello') - path = req.path().lstrip('/') - try: - req.write('') - except KeyError: - req.response(404, body='Not found') + def handle_head(self, env, start_response): + headers = [('x-head', 'hello'), ('Content-type', 'text/plain')] + start_response("200 OK", headers) + return [""] - def handle_put(self, req): - req.set_header('x-put', 'hello') - path = req.path().lstrip('/') + def handle_put(self, env, start_response): + headers = [('x-put', 'hello'), ('Content-type', 'text/plain')] + path = env['PATH_INFO'].lstrip('/') if not path: - req.response(400, body='') - return - if path in self.stuff: - req.response(204) - else: - req.response(201) - self.stuff[path] = req.read_body() - req.write('') + start_response("400 Bad Request", headers) + return [""] - def handle_delete(self, req): - req.set_header('x-delete', 'hello') - path = req.path().lstrip('/') + if path in self.stuff: + start_response("204 No Content", headers) + else: + start_response("201 Created", headers) + self.stuff[path] = env['wsgi.input'].read(int(env.get('CONTENT_LENGTH', '0'))) + return [""] + + def handle_delete(self, env, start_response): + headers = [('x-delete', 'hello'), ('Content-type', 'text/plain')] + path = env['PATH_INFO'].lstrip('/') if not path: - req.response(400, body='') - return + start_response("400 Bad Request", headers) + return [""] try: del self.stuff[path] - req.response(204) + start_response("204 No Content", headers) except KeyError: - req.response(404) - req.write('') + start_response("404 Not Found", headers) + return [""] - def handle_post(self, req): - req.set_header('x-post', 'hello') - req.write(req.read_body()) + def handle_post(self, env, start_response): + headers = [('x-post', 'hello'), ('Content-type', 'text/plain')] + start_response("200 OK", headers) + return [env['wsgi.input'].read(int(env.get('CONTENT_LENGTH', '0')))] class TestBase(object): @@ -109,7 +122,7 @@ class TestBase(object): def setUp(self): self.logfile = StringIO() - self.victim = api.spawn(httpd.server, + self.victim = api.spawn(wsgi.server, api.tcp_listener(('0.0.0.0', 31337)), self.site_class(), log=self.logfile, @@ -211,45 +224,44 @@ class TestHttpc(TestBase, tests.TestCase): class RedirectSite(BasicSite): - response_code = 301 + response_code = "301 Moved Permanently" + + def __call__(self, env, start_response): + path = env['PATH_INFO'] + if path.startswith('/redirect/'): + url = 'http://' + env['HTTP_HOST'] + path.replace('/redirect/', '/') + start_response(self.response_code, [("Location", url)]) + return [""] + return super(RedirectSite, self).__call__(env, start_response) - def handle_request(self, req): - if req.path().startswith('/redirect/'): - url = ('http://' + req.get_header('host') + - req.uri().replace('/redirect/', '/')) - req.response(self.response_code, headers={'location': url}, - body='') - return - return Site.handle_request(self, req) class Site301(RedirectSite): pass class Site302(BasicSite): - def handle_request(self, req): - if req.path().startswith('/expired/'): - url = ('http://' + req.get_header('host') + - req.uri().replace('/expired/', '/')) - headers = {'location': url, 'expires': '0'} - req.response(302, headers=headers, body='') - return - if req.path().startswith('/expires/'): - url = ('http://' + req.get_header('host') + - req.uri().replace('/expires/', '/')) + def __call__(self, env, start_response): + path = env['PATH_INFO'] + if path.startswith('/expired/'): + url = 'http://' + env['HTTP_HOST'] + path.replace('/expired/', '/') + headers = [('location', url), ('expires', '0')] + start_response("302 Found", headers) + return [""] + if path.startswith('/expires/'): + url = 'http://' + env['HTTP_HOST'] + path.replace('/expires/', '/') expires = time.time() + (100 * 24 * 60 * 60) - headers = {'location': url, 'expires': httpc.to_http_time(expires)} - req.response(302, headers=headers, body='') - return - return Site.handle_request(self, req) + headers = [('location', url), ('expires', httpc.to_http_time(expires))] + start_response("302 Found", headers) + return [""] + return super(Site302, self).__call__(env, start_response) class Site303(RedirectSite): - response_code = 303 + response_code = "303 See Other" class Site307(RedirectSite): - response_code = 307 + response_code = "307 Temporary Redirect" class TestHttpc301(TestBase, tests.TestCase): @@ -332,15 +344,9 @@ class TestHttpc307(TestBase, tests.TestCase): class Site500(BasicSite): - def handle_request(self, req): - req.response(500, body="screw you world") - return - - -class Site500(BasicSite): - def handle_request(self, req): - req.response(500, body="screw you world") - return + def __call__(self, env, start_response): + start_response("500 Internal Server Error", [("Content-type", "text/plain")]) + return ["screw you world"] class TestHttpc500(TestBase, tests.TestCase): @@ -361,8 +367,9 @@ class TestHttpc500(TestBase, tests.TestCase): class Site504(BasicSite): - def handle_request(self, req): - req.response(504, body="screw you world") + def __call__(self, env, start_response): + start_response("504 Gateway Timeout", [("Content-type", "text/plain")]) + return ["screw you world"] class TestHttpc504(TestBase, tests.TestCase): diff --git a/greentest/pools_test.py b/greentest/pools_test.py index e604bc1..282067a 100644 --- a/greentest/pools_test.py +++ b/greentest/pools_test.py @@ -22,10 +22,11 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ +import time import unittest from eventlet import api -from eventlet import channel +from eventlet import coros from eventlet import pools @@ -65,13 +66,19 @@ class TestIntPool(unittest.TestCase): self.assertEquals(self.pool.free(), 4) def test_exhaustion(self): - waiter = channel.channel() + waiter = coros.event() def consumer(): gotten = None + cancel = api.exc_after(1, api.TimeoutError) try: + print time.asctime(), "getting" gotten = self.pool.get() + print time.asctime(), "got" finally: + cancel.cancel() + print "waiter send" waiter.send(gotten) + print "waiter sent" api.spawn(consumer) @@ -82,14 +89,17 @@ class TestIntPool(unittest.TestCase): # Let consumer run; nothing will be in the pool, so he will wait api.sleep(0) + print "put in pool", one # Wake consumer self.pool.put(one) + print "done put" # wait for the consumer - self.assertEquals(waiter.receive(), one) + self.assertEquals(waiter.wait(), one) + print "done wait" def test_blocks_on_pool(self): - waiter = channel.channel() + waiter = coros.event() def greedy(): self.pool.get() self.pool.get() @@ -98,7 +108,9 @@ class TestIntPool(unittest.TestCase): # No one should be waiting yet. self.assertEquals(self.pool.waiting(), 0) # The call to the next get will unschedule this routine. + print "calling get" self.pool.get() + print "called get" # So this send should never be called. waiter.send('Failed!') @@ -113,8 +125,8 @@ class TestIntPool(unittest.TestCase): ## Greedy should be blocking on the last get self.assertEquals(self.pool.waiting(), 1) - ## Send will never be called, so balance should be 0. - self.assertEquals(waiter.balance, 0) + ## Send will never be called, so the event should not be ready. + self.assertEquals(waiter.ready(), False) api.kill(killable) @@ -140,38 +152,6 @@ class TestIntPool2(unittest.TestCase): gotten = self.pool.get() self.assertEquals(gotten, 1) - -ALWAYS = RuntimeError('I always fail') -SOMETIMES = RuntimeError('I fail half the time') - - -class TestFan(unittest.TestCase): - mode = 'static' - def setUp(self): - self.pool = IntPool(max_size=2) - - def test_with_list(self): - list_of_input = ['agent-one', 'agent-two', 'agent-three'] - - def my_callable(pool_item, next_thing): - ## Do some "blocking" (yielding) thing - api.sleep(0.01) - return next_thing - - output = self.pool.fan(my_callable, list_of_input) - self.assertEquals(list_of_input, output) - - def test_all_fail(self): - def my_failure(pool_item, next_thing): - raise ALWAYS - self.assertRaises(pools.AllFailed, self.pool.fan, my_failure, range(4)) - - def test_some_fail(self): - def my_failing_callable(pool_item, next_thing): - if next_thing % 2: - raise SOMETIMES - return next_thing - self.assertRaises(pools.SomeFailed, self.pool.fan, my_failing_callable, range(4)) if __name__ == '__main__': diff --git a/greentest/processes_test.py b/greentest/processes_test.py index e2c1660..98ce505 100644 --- a/greentest/processes_test.py +++ b/greentest/processes_test.py @@ -23,7 +23,6 @@ THE SOFTWARE. """ from greentest import tests from eventlet import api -from eventlet import channel from eventlet import processes class TestEchoPool(tests.TestCase): diff --git a/greentest/saranwrap_test.py b/greentest/saranwrap_test.py index df0878d..32be606 100644 --- a/greentest/saranwrap_test.py +++ b/greentest/saranwrap_test.py @@ -225,7 +225,7 @@ class TestSaranwrap(unittest.TestCase): tid = make_uuid() self.assertEqual(tid.get_version(), uuid.uuid4().get_version()) def make_list(): - from eventlet import saranwrap_test + from greentest import saranwrap_test prox = saranwrap.wrap(saranwrap_test.list_maker) # after this function returns, prox should fall out of scope return prox() @@ -270,7 +270,7 @@ sys_path = sys.path""") sys.path.remove(temp_dir) def test_contention(self): - from eventlet import saranwrap_test + from greentest import saranwrap_test prox = saranwrap.wrap(saranwrap_test) pool = coros.CoroutinePool(max_size=4) @@ -296,7 +296,7 @@ sys_path = sys.path""") def test_list_of_functions(self): return # this test is known to fail, we can implement it sometime in the future if we wish - from eventlet import saranwrap_test + from greentest import saranwrap_test prox = saranwrap.wrap([saranwrap_test.list_maker]) self.assertEquals(list_maker(), prox[0]())