This commit is contained in:
donovan
2008-12-29 16:15:14 -08:00
23 changed files with 151 additions and 1019 deletions

View File

@@ -6,3 +6,7 @@ dist
eventlet.egg-info
build
htmlreports
*.esproj
.DS_Store
results.*.db

View File

@@ -30,7 +30,7 @@ import linecache
import inspect
import traceback
from eventlet.support import greenlet
from eventlet.support import greenlets as greenlet
from eventlet import tls
__all__ = [
@@ -40,6 +40,12 @@ __all__ = [
'unspew', 'use_hub', 'with_timeout', 'timeout']
def switch(coro, result=None, exc=None):
if exc is not None:
return coro.throw(exc)
return coro.switch(result)
class TimeoutError(Exception):
"""Exception raised if an asynchronous operation times out"""
pass

View File

@@ -1,40 +0,0 @@
import collections, time, Queue
qt = 10000
l1 = collections.deque()
l2 = []
l3 = Queue.Queue()
start = time.time()
for i in range(1,qt):
l1.append(i)
for i in range(1,qt):
l1.popleft()
mid = time.time()
for i in range(1,qt):
l2.append(i)
for i in range(1,qt):
l2.pop(0)
mid2 = time.time()
for i in range(1,qt):
l3.put_nowait(i)
for i in range(1,qt):
l3.get_nowait()
end = time.time()
dtime = mid - start
ltime = mid2 - mid
qtime = end - mid2
print "deque:", dtime
print " list:", ltime
print "queue:", qtime

View File

@@ -26,7 +26,7 @@ THE SOFTWARE.
import collections
from eventlet import api
from eventlet.support import greenlet
from eventlet.support import greenlets as greenlet
__all__ = ['channel']

View File

@@ -558,6 +558,7 @@ class Semaphore(object):
if self.counter<=0:
self._waiters[api.getcurrent()] = None
try:
print "hub switch"
api.get_hub().switch()
finally:
self._waiters.pop(api.getcurrent(), None)
@@ -618,6 +619,7 @@ class BoundedSemaphore(object):
api.get_hub().schedule_call_global(0, self._do_unlock)
self._acquire_waiters[api.getcurrent()] = None
try:
print "HUB switch"
api.get_hub().switch()
finally:
self._acquire_waiters.pop(api.getcurrent(), None)

View File

@@ -1,7 +1,7 @@
"""implements standard module 'thread' with greenlets"""
from __future__ import absolute_import
import thread as thread_module
from eventlet.support import greenlet
from eventlet.support import greenlets as greenlet
from eventlet.api import spawn
from eventlet.coros import Semaphore as LockType

View File

@@ -25,7 +25,7 @@ THE SOFTWARE.
import sys
import itertools
from eventlet.support import greenlet
from eventlet.support import greenlets as greenlet
from eventlet import tls

View File

@@ -1,135 +0,0 @@
from eventlet.support import greenlet
greenlet.main = greenlet.getcurrent() # WTF did greenlet.main go?
from twisted.internet import defer, reactor
def _desc(g):
if isinstance(g, DebugGreenlet):
if hasattr(g, 'name'):
desc = "<%s %s" % (g.name, hex(id(g)))
else:
desc = "<NO NAME!? %s" % (hex(id(g)), )
else:
desc = "<%s" % (hex(id(g)),)
if g is greenlet.main:
desc += " (main)"
desc += ">"
return desc
class DebugGreenlet(greenlet.greenlet):
__slots__ = ('name',)
def __init__(self, func, name=None):
super(DebugGreenlet, self).__init__(func)
self.name = name
def switch(self, *args, **kwargs):
current = greenlet.getcurrent()
#print "%s -> %s" % (_desc(current), _desc(self))
return super(DebugGreenlet, self).switch(*args, **kwargs)
def deferredGreenlet(func):
"""
I am a function decorator for functions that call blockOn. The
function I return will call the original function inside of a
greenlet, and return a Deferred.
TODO: Do a hack so the name of 'replacement' is the name of 'func'.
"""
def replacement(*args, **kwargs):
d = defer.Deferred()
def greenfunc(*args, **kwargs):
try:
d.callback(func(*args, **kwargs))
except:
d.errback()
g = greenlet.greenlet(greenfunc)
crap = g.switch(*args, **kwargs)
return d
return replacement
class CalledFromMain(Exception):
pass
class _IAmAnException(object):
def __init__(self, f):
self.f = f
def blockOn(d, desc=None):
"""
Use me in non-main greenlets to wait for a Deferred to fire.
"""
g = greenlet.getcurrent()
if g is greenlet.main:
raise CalledFromMain("You cannot call blockOn from the main greenlet.")
## Note ##
# Notice that this code catches and ignores GreenletExit. The
# greenlet mechanism sends a GreenletExit at a blocking greenlet if
# there is no chance that the greenlet will be fired by anyone
# else -- that is, no other greenlets have a reference to the one
# that's blocking.
# This is often the case with blockOn. When someone blocks on a
# Deferred, these callbacks are added to it. When the deferred
# fires, we make the blockOn() call finish -- we resume the
# blocker. At that point, the Deferred chain is irrelevant; it
# makes no sense for any other callbacks to be called. The
# Deferred, then, will likely be garbage collected and thus all
# references to our greenlet will be lost -- and thus it will have
# GreenletExit fired.
def cb(r):
try:
# This callback might be fired immediately when added
# and switching to the current greenlet seems to do nothing
# (ie. we will never actually return to the function we called
# blockOn from), so we make the call happen later in the main greenlet
# instead, if the current greenlet is the same as the one we are swithcing
# to.
if g == greenlet.getcurrent():
reactor.callLater(0, g.switch, r)
else:
g.switch(r)
except greenlet.GreenletExit:
pass
def eb(f):
try:
g.switch(_IAmAnException(f))
except greenlet.GreenletExit:
pass
d.addCallbacks(cb, eb)
x = g.parent.switch()
if isinstance(x, _IAmAnException):
x.f.raiseException()
return x
class GreenletWrapper(object):
"""Wrap an object which presents an asynchronous interface (via Deferreds).
The wrapped object will present the same interface, but all methods will
return results, rather than Deferreds.
When a Deferred would otherwise be returned, a greenlet is created and then
control is switched back to the main greenlet. When the Deferred fires,
control is switched back to the created greenlet and execution resumes with
the result.
"""
def __init__(self, wrappee):
self.wrappee = wrappee
def __getattribute__(self, name):
wrappee = super(GreenletWrapper, self).__getattribute__('wrappee')
original = getattr(wrappee, name)
if callable(original):
def wrapper(*a, **kw):
result = original(*a, **kw)
if isinstance(result, defer.Deferred):
return blockOn(result)
return result
return wrapper
return original

View File

@@ -1,605 +0,0 @@
"""\
@file httpd.py
@author Donovan Preston
Copyright (c) 2005-2006, Donovan Preston
Copyright (c) 2007, Linden Research, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import cgi
import errno
from eventlet.green import socket
import sys
import time
from eventlet.green import urllib
import traceback
from eventlet.green import BaseHTTPServer
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
from eventlet import api
from eventlet import coros
DEFAULT_MAX_HTTP_VERSION = 'HTTP/1.1'
USE_ACCESS_LOG = True
CONNECTION_CLOSED = (errno.EPIPE, errno.ECONNRESET)
class ErrorResponse(Exception):
_responses = BaseHTTPServer.BaseHTTPRequestHandler.responses
def __init__(self, code, reason_phrase=None, headers=None, body=None):
Exception.__init__(self, reason_phrase)
self.code = code
if reason_phrase is None:
self.reason = self._responses[code][0]
else:
self.reason = reason_phrase
self.headers = headers
if body is None:
self.body = self._responses[code][1]
else:
self.body = body
class Request(object):
_method = None
_path = None
_responsecode = 200
_reason_phrase = None
_request_started = False
_chunked = False
_producer_adapters = {}
depth = 0
def __init__(self, protocol, method, path, headers):
self.context = {}
self.request_start_time = time.time()
self.site = protocol.server.site
self.protocol = protocol
self._method = method
if '?' in path:
self._path, self._query = path.split('?', 1)
self._query = self._query.replace('&amp;', '&')
else:
self._path, self._query = path, None
self._incoming_headers = headers
self._outgoing_headers = dict()
def response(self, code, reason_phrase=None, headers=None, body=None):
"""Change the response code. This will not be sent until some
data is written; last call to this method wins. Default is
200 if this is not called.
"""
self._responsecode = code
self._reason_phrase = reason_phrase
self.protocol.set_response_code(self, code, reason_phrase)
if headers is not None:
try:
headers = headers.iteritems()
except AttributeError:
pass
for key, value in headers:
self.set_header(key, value)
if body is not None:
self.write(body)
def is_okay(self):
return 200 <= self._responsecode <= 299
def full_url(self):
path = self.path()
query = self.query()
if query:
path = path + '?' + query
via = self.get_header('via', '')
if via.strip():
next_part = iter(via.split()).next
received_protocol = next_part()
received_by = next_part()
if received_by.endswith(','):
received_by = received_by[:-1]
else:
comment = ''
while not comment.endswith(','):
try:
comment += next_part()
except StopIteration:
comment += ','
break
comment = comment[:-1]
else:
received_by = self.get_header('host')
return '%s://%s%s' % (self.request_protocol(), received_by, path)
def begin_response(self, length="-"):
"""Begin the response, and return the initial response text
"""
self._request_started = True
request_time = time.time() - self.request_start_time
code = self._responsecode
proto = self.protocol
if USE_ACCESS_LOG:
proto.server.write_access_log_line(
proto.client_address[0],
time.strftime("%d/%b/%Y %H:%M:%S"),
proto.requestline,
code,
length,
request_time)
if self._reason_phrase is not None:
message = self._reason_phrase.split("\n")[0]
elif code in proto.responses:
message = proto.responses[code][0]
else:
message = ''
if proto.request_version == 'HTTP/0.9':
return []
response_lines = proto.generate_status_line()
if not self._outgoing_headers.has_key('connection'):
con = self.get_header('connection')
if con is None and proto.request_version == 'HTTP/1.0':
con = 'close'
if con is not None:
self.set_header('connection', con)
for key, value in self._outgoing_headers.items():
key = '-'.join([x.capitalize() for x in key.split('-')])
response_lines.append("%s: %s" % (key, value))
response_lines.append("")
return response_lines
def write(self, obj):
"""Writes an arbitrary object to the response, using
the sitemap's adapt method to convert it to bytes.
"""
if isinstance(obj, str):
self._write_bytes(obj)
elif isinstance(obj, unicode):
# use utf8 encoding for now, *TODO support charset negotiation
# Content-Type: text/html; charset=utf-8
ctype = self._outgoing_headers.get('content-type', 'text/html')
ctype = ctype + '; charset=utf-8'
self._outgoing_headers['content-type'] = ctype
self._write_bytes(obj.encode('utf8'))
else:
self.site.adapt(obj, self)
def _write_bytes(self, data):
"""Write all the data of the response.
Can be called just once.
"""
if self._request_started:
print "Request has already written a response:"
traceback.print_stack()
return
self._outgoing_headers['content-length'] = len(data)
response_lines = self.begin_response(len(data))
response_lines.append(data)
self.protocol.wfile.write("\r\n".join(response_lines))
if hasattr(self.protocol.wfile, 'flush'):
self.protocol.wfile.flush()
def method(self):
return self._method
def path(self):
return self._path
def path_segments(self):
return [urllib.unquote_plus(x) for x in self._path.split('/')[1:]]
def query(self):
return self._query
def uri(self):
if self._query:
return '%s?%s' % (
self._path, self._query)
return self._path
def get_headers(self):
return self._incoming_headers
def get_header(self, header_name, default=None):
return self.get_headers().get(header_name.lower(), default)
def get_query_pairs(self):
if not hasattr(self, '_split_query'):
if self._query is None:
self._split_query = ()
else:
spl = self._query.split('&')
spl = [x.split('=', 1) for x in spl if x]
self._split_query = []
for query in spl:
if len(query) == 1:
key = query[0]
value = ''
else:
key, value = query
self._split_query.append((urllib.unquote_plus(key), urllib.unquote_plus(value)))
return self._split_query
def get_queries_generator(self, name):
"""Generate all query parameters matching the given name.
"""
for key, value in self.get_query_pairs():
if key == name or not name:
yield value
get_queries = lambda self, name: list(self.get_queries_generator)
def get_query(self, name, default=None):
try:
return self.get_queries_generator(name).next()
except StopIteration:
return default
def get_arg_list(self, name):
return self.get_field_storage().getlist(name)
def get_arg(self, name, default=None):
return self.get_field_storage().getfirst(name, default)
def get_field_storage(self):
if not hasattr(self, '_field_storage'):
if self.method() == 'GET':
data = ''
if self._query:
data = self._query
else:
data = self.read_body()
fl = StringIO(data)
## Allow our resource to provide the FieldStorage instance for
## customization purposes.
headers = self.get_headers()
environ = dict(
REQUEST_METHOD='POST',
QUERY_STRING=self._query or '')
self._field_storage = cgi.FieldStorage(fl, headers, environ=environ)
return self._field_storage
def set_header(self, key, value):
if key.lower() == 'connection' and value.lower() == 'close':
self.protocol.close_connection = 1
self._outgoing_headers[key.lower()] = value
__setitem__ = set_header
def get_outgoing_header(self, key):
return self._outgoing_headers[key.lower()]
def has_outgoing_header(self, key):
return self._outgoing_headers.has_key(key.lower())
def socket(self):
return self.protocol.socket
def error(self, response=None, body=None, log_traceback=True):
if log_traceback:
traceback.print_exc(file=self.log)
if response is None:
response = 500
if body is None:
typ, val, tb = sys.exc_info()
body = dict(type=str(typ), error=True, reason=str(val))
self.response(response)
if(type(body) is str and not self.response_written()):
self.write(body)
return
try:
produce(body, self)
except Exception, e:
traceback.print_exc(file=self.log)
if not self.response_written():
self.write('Internal Server Error')
def not_found(self):
self.error(404, 'Not Found\n', log_traceback=False)
def raw_body(self):
if not hasattr(self, '_cached_body'):
self.read_body()
return self._cached_body
def read_body(self):
""" Returns the string body that was read off the request, or
the empty string if there was no request body.
Requires a content-length header. Caches the body so multiple
calls to read_body() are free.
"""
if not hasattr(self, '_cached_body'):
length = self.get_header('content-length')
if length:
length = int(length)
if length:
self._cached_body = self.protocol.rfile.read(length)
else:
self._cached_body = ''
return self._cached_body
def parsed_body(self):
""" Returns the parsed version of the body, using the
content-type header to select from the parsers on the site
object.
If no parser is found, returns the string body from
read_body(). Caches the parsed body so multiple calls to
parsed_body() are free.
"""
if not hasattr(self, '_cached_parsed_body'):
body = self.read_body()
if hasattr(self.site, 'parsers'):
ct = self.get_header('content-type')
parser = self.site.parsers.get(ct)
if parser is not None:
body = parser(body)
else:
ex = ValueError("Could not find parser for content-type: %s" % ct)
ex.body = body
raise ex
self._cached_parsed_body = body
return self._cached_parsed_body
def override_body(self, body):
if not hasattr(self, '_cached_parsed_body'):
self.read_body() ## Read and discard body
self._cached_parsed_body = body
def response_written(self):
## TODO change badly named variable
return self._request_started
def request_version(self):
return self.protocol.request_version
def request_protocol(self):
if self.protocol.is_secure:
return "https"
return "http"
def server_address(self):
return self.protocol.server.address
def __repr__(self):
return "<Request %s %s>" % (
getattr(self, '_method'), getattr(self, '_path'))
DEFAULT_TIMEOUT = 300
# This value was chosen because apache 2 has a default limit of 8190.
# I believe that slightly smaller number is because apache does not
# count the \r\n.
MAX_REQUEST_LINE = 8192
class Timeout(RuntimeError):
pass
class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
def __init__(self, request, client_address, server):
self.rfile = self.wfile = request.makeGreenFile()
self.is_secure = request.is_secure
request.close() # close this now so that when rfile and wfile are closed, the socket gets closed
self.client_address = client_address
self.server = server
self.set_response_code(None, 200, None)
self.protocol_version = server.max_http_version
def close(self):
self.rfile.close()
self.wfile.close()
def set_response_code(self, request, code, message):
self._code = code
if message is not None:
self._message = message.split("\n")[0]
elif code in self.responses:
self._message = self.responses[code][0]
else:
self._message = ''
def generate_status_line(self):
return [
"%s %d %s" % (
self.protocol_version, self._code, self._message)]
def write_bad_request(self, status, reason):
self.set_response_code(self, status, reason)
self.wfile.write(''.join(self.generate_status_line()))
self.wfile.write('\r\nServer: %s\r\n' % self.version_string())
self.wfile.write('Date: %s\r\n' % self.date_time_string())
self.wfile.write('Content-Length: 0\r\n\r\n')
def handle(self):
self.close_connection = 0
timeout = DEFAULT_TIMEOUT
while not self.close_connection:
if timeout == 0:
break
cancel = api.exc_after(timeout, Timeout)
try:
self.raw_requestline = self.rfile.readline(MAX_REQUEST_LINE)
if self.raw_requestline is not None:
if len(self.raw_requestline) == MAX_REQUEST_LINE:
# Someone sent a request line which is too
# large. Be helpful and tell them.
self.write_bad_request(414, 'Request-URI Too Long')
self.close_connection = True
continue
except socket.error, e:
if e[0] in CONNECTION_CLOSED:
self.close_connection = True
cancel.cancel()
continue
except Timeout:
self.close_connection = True
continue
except Exception, e:
try:
if e[0][0][0].startswith('SSL'):
print "SSL Error:", e[0][0]
self.close_connection = True
cancel.cancel()
continue
except Exception, f:
print "Exception in ssl test:",f
pass
raise e
cancel.cancel()
if not self.raw_requestline or not self.parse_request():
self.close_connection = True
continue
self.set_response_code(None, 200, None)
request = Request(self, self.command, self.path, self.headers)
request.set_header('Server', self.version_string())
request.set_header('Date', self.date_time_string())
try:
timeout = int(request.get_header('keep-alive', timeout))
except TypeError, ValueError:
pass
try:
try:
try:
self.server.site.handle_request(request)
except ErrorResponse, err:
request.response(code=err.code,
reason_phrase=err.reason,
headers=err.headers,
body=err.body)
finally:
# clean up any timers that might have been left around by the handling code
api.get_hub().cancel_timers(api.getcurrent())
# throw an exception if it failed to write a body
if not request.response_written():
raise NotImplementedError("Handler failed to write response to request: %s" % request)
if not hasattr(self, '_cached_body'):
try:
request.read_body() ## read & discard body
except:
pass
except socket.error, e:
# Broken pipe, connection reset by peer
if e[0] in CONNECTION_CLOSED:
#print "Remote host closed connection before response could be sent"
pass
else:
raise
except Exception, e:
self.server.log_message("Exception caught in HttpRequest.handle():\n")
self.server.log_exception(*sys.exc_info())
if not request.response_written():
request.response(500)
request.write('Internal Server Error')
self.close()
raise e # can't do a plain raise since exc_info might have been cleared
self.close()
class Server(BaseHTTPServer.HTTPServer):
def __init__(self, socket, address, site, log, max_http_version=DEFAULT_MAX_HTTP_VERSION):
self.socket = socket
self.address = address
self.site = site
self.max_http_version = max_http_version
if log:
self.log = log
if hasattr(log, 'info'):
log.write = log.info
else:
self.log = self
def write(self, something):
sys.stdout.write('%s' % (something, )); sys.stdout.flush()
def log_message(self, message):
self.log.write(message)
def log_exception(self, type, value, tb):
self.log.write(''.join(traceback.format_exception(type, value, tb)))
def write_access_log_line(self, *args):
"""Write a line to the access.log. Arguments:
client_address, date_time, requestline, code, size, request_time
"""
self.log.write(
'%s - - [%s] "%s" %s %s %.6f\n' % args)
def server(sock, site, log=None, max_size=512, serv=None, max_http_version=DEFAULT_MAX_HTTP_VERSION):
pool = coros.CoroutinePool(max_size=max_size)
if serv is None:
serv = Server(sock, sock.getsockname(), site, log, max_http_version=max_http_version)
try:
serv.log.write("httpd starting up on %s\n" % (sock.getsockname(), ))
while True:
try:
new_sock, address = sock.accept()
proto = HttpProtocol(new_sock, address, serv)
pool.execute_async(proto.handle)
api.sleep(0) # sleep to allow other coros to run
except KeyboardInterrupt:
api.get_hub().remove_descriptor(sock.fileno())
serv.log.write("httpd exiting\n")
break
finally:
try:
sock.close()
except socket.error:
pass
if __name__ == '__main__':
class TestSite(object):
def handle_request(self, req):
req.write('hello')
server(
api.tcp_listener(('127.0.0.1', 8080)),
TestSite())

View File

@@ -29,7 +29,7 @@ import errno
import traceback
import time
from eventlet.support import greenlet
from eventlet.support import greenlets as greenlet
from eventlet.timer import Timer
_g_debug = True

View File

@@ -32,7 +32,7 @@ import time
from eventlet.timer import Timer
from eventlet.hubs import hub
from eventlet.support import greenlet
from eventlet.support import greenlets as greenlet
# XXX for debugging only
#raise ImportError()

View File

@@ -33,7 +33,7 @@ from eventlet import greenlib
from eventlet.timer import Timer
from eventlet.hubs import hub
from eventlet.support import greenlet
from eventlet.support import greenlets as greenlet
# XXX for debugging only
#raise ImportError()

View File

@@ -1,60 +0,0 @@
"""\
@file selecthub.py
Copyright (c) 2005-2006, Bob Ippolito
Copyright (c) 2007, Linden Research, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import sys
import select
import errno
import time
from eventlet.hubs import hub
from eventlet.support import greenlet
class Hub(hub.BaseHub):
def wait(self, seconds=None):
readers = self.readers
writers = self.writers
excs = self.excs
if not readers and not writers and not excs:
if seconds:
time.sleep(seconds)
return
try:
r, w, ig = select.select(readers.keys(), writers.keys(), [], seconds)
except select.error, e:
if e.args[0] == errno.EINTR:
return
raise
SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS
for observed, events in ((readers, r), (writers, w)):
#print "events", r, " ", w
for fileno in events:
try:
cb = observed.get(fileno)
#print "cb", cb, " ", observed
if cb is not None:
cb(fileno)
except SYSTEM_EXCEPTIONS:
raise
except:
self.squelch_exception(fileno, sys.exc_info())

View File

@@ -29,7 +29,7 @@ import time
from eventlet.hubs import hub
from eventlet.support import greenlet
from eventlet.support import greenlets as greenlet
class Hub(hub.BaseHub):
def wait(self, seconds=None):

View File

@@ -29,17 +29,6 @@ import socket
from eventlet import api
from eventlet import channel
class FanFailed(RuntimeError):
pass
class SomeFailed(FanFailed):
pass
class AllFailed(FanFailed):
pass
class Pool(object):
"""
@@ -106,44 +95,6 @@ class Pool(object):
"""
raise NotImplementedError("Implement in subclass")
def fan(self, block, input_list):
chan = channel.channel()
results = []
exceptional_results = 0
for index, input_item in enumerate(input_list):
pool_item = self.get()
## Fan out
api.spawn(
self._invoke, block, pool_item, input_item, index, chan)
## Fan back in
for i in range(len(input_list)):
## Wait for all guys to send to the queue
index, value = chan.receive()
if isinstance(value, Exception):
exceptional_results += 1
results.append((index, value))
results.sort()
results = [value for index, value in results]
if exceptional_results:
if exceptional_results == len(results):
raise AllFailed(results)
raise SomeFailed(results)
return results
def _invoke(self, block, pool_item, input_item, index, chan):
try:
result = block(pool_item, input_item)
except Exception, e:
self.put(pool_item)
chan.send((index, e))
return
self.put(pool_item)
chan.send((index, result))
class Token(object):
pass

View File

@@ -0,0 +1,22 @@
try:
import greenlet
getcurrent = greenlet.getcurrent
GreenletExit = greenlet.GreenletExit
greenlet = greenlet.greenlet
except ImportError, e:
print e
try:
from py.magic import greenlet
getcurrent = greenlet.getcurrent
GreenletExit = greenlet.GreenletExit
except ImportError:
try:
from stackless import greenlet
getcurrent = greenlet.getcurrent
GreenletExit = greenlet.GreenletExit
except ImportError:
try:
from support.stacklesss import greenlet, getcurrent, GreenletExit
except ImportError, e:
raise ImportError("Unable to find an implementation of greenlet.")

View File

@@ -5,7 +5,7 @@ yourself.
"""
from eventlet.hubs.twistedr import BaseTwistedHub
from eventlet.api import use_hub, _threadlocal
from eventlet.support import greenlet
from eventlet.support import greenlets as greenlet
use_hub(BaseTwistedHub)
assert not hasattr(_threadlocal, 'hub')

View File

@@ -48,7 +48,7 @@ except ImportError:
def g_log(*args):
import sys
from eventlet.support import greenlet
from eventlet.support import greenlets as greenlet
from eventlet.greenlib import greenlet_id
g_id = greenlet_id()
if g_id is None:

View File

@@ -213,6 +213,7 @@ class TestCoroutinePool(tests.TestCase):
for x in range(6):
pool.execute(lambda n: n, x)
for y in range(6):
print "wait", y
pool.wait()
def test_track_slow_event(self):

View File

@@ -22,11 +22,14 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import cgi
from eventlet import api
from eventlet import httpc
from eventlet import httpd
from eventlet import processes
from eventlet import util
from eventlet import wsgi
import time
try:
from cStringIO import StringIO
@@ -41,64 +44,74 @@ class Site(object):
def __init__(self):
self.stuff = {'hello': 'hello world'}
def adapt(self, obj, req):
req.write(str(obj))
def __call__(self, env, start_response):
return getattr(self, 'handle_%s' % env['REQUEST_METHOD'].lower())(env, start_response)
def _get_query_pairs(env):
parsed = cgi.parse_qs(env['QUERY_STRING'])
for key, values in parsed.items():
for val in values:
yield key, val
def get_query_pairs(env):
return list(_get_query_pairs(env))
def handle_request(self, req):
return getattr(self, 'handle_%s' % req.method().lower())(req)
class BasicSite(Site):
def handle_get(self, req):
req.set_header('x-get', 'hello')
def handle_get(self, env, start_response):
headers = [('x-get', 'hello'), ('Content-type', 'text/plain')]
resp = StringIO()
path = req.path().lstrip('/')
path = env['PATH_INFO'].lstrip('/')
try:
resp.write(self.stuff[path])
except KeyError:
req.response(404, body='Not found')
return
for k,v in req.get_query_pairs():
start_response("404 Not Found", headers)
return ["Not Found"]
for k,v in get_query_pairs(env):
resp.write(k + '=' + v + '\n')
req.write(resp.getvalue())
start_response("200 OK", headers)
return [resp.getvalue()]
def handle_head(self, req):
req.set_header('x-head', 'hello')
path = req.path().lstrip('/')
try:
req.write('')
except KeyError:
req.response(404, body='Not found')
def handle_head(self, env, start_response):
headers = [('x-head', 'hello'), ('Content-type', 'text/plain')]
start_response("200 OK", headers)
return [""]
def handle_put(self, req):
req.set_header('x-put', 'hello')
path = req.path().lstrip('/')
def handle_put(self, env, start_response):
headers = [('x-put', 'hello'), ('Content-type', 'text/plain')]
path = env['PATH_INFO'].lstrip('/')
if not path:
req.response(400, body='')
return
if path in self.stuff:
req.response(204)
else:
req.response(201)
self.stuff[path] = req.read_body()
req.write('')
start_response("400 Bad Request", headers)
return [""]
def handle_delete(self, req):
req.set_header('x-delete', 'hello')
path = req.path().lstrip('/')
if path in self.stuff:
start_response("204 No Content", headers)
else:
start_response("201 Created", headers)
self.stuff[path] = env['wsgi.input'].read(int(env.get('CONTENT_LENGTH', '0')))
return [""]
def handle_delete(self, env, start_response):
headers = [('x-delete', 'hello'), ('Content-type', 'text/plain')]
path = env['PATH_INFO'].lstrip('/')
if not path:
req.response(400, body='')
return
start_response("400 Bad Request", headers)
return [""]
try:
del self.stuff[path]
req.response(204)
start_response("204 No Content", headers)
except KeyError:
req.response(404)
req.write('')
start_response("404 Not Found", headers)
return [""]
def handle_post(self, req):
req.set_header('x-post', 'hello')
req.write(req.read_body())
def handle_post(self, env, start_response):
headers = [('x-post', 'hello'), ('Content-type', 'text/plain')]
start_response("200 OK", headers)
return [env['wsgi.input'].read(int(env.get('CONTENT_LENGTH', '0')))]
class TestBase(object):
@@ -109,7 +122,7 @@ class TestBase(object):
def setUp(self):
self.logfile = StringIO()
self.victim = api.spawn(httpd.server,
self.victim = api.spawn(wsgi.server,
api.tcp_listener(('0.0.0.0', 31337)),
self.site_class(),
log=self.logfile,
@@ -211,45 +224,44 @@ class TestHttpc(TestBase, tests.TestCase):
class RedirectSite(BasicSite):
response_code = 301
response_code = "301 Moved Permanently"
def __call__(self, env, start_response):
path = env['PATH_INFO']
if path.startswith('/redirect/'):
url = 'http://' + env['HTTP_HOST'] + path.replace('/redirect/', '/')
start_response(self.response_code, [("Location", url)])
return [""]
return super(RedirectSite, self).__call__(env, start_response)
def handle_request(self, req):
if req.path().startswith('/redirect/'):
url = ('http://' + req.get_header('host') +
req.uri().replace('/redirect/', '/'))
req.response(self.response_code, headers={'location': url},
body='')
return
return Site.handle_request(self, req)
class Site301(RedirectSite):
pass
class Site302(BasicSite):
def handle_request(self, req):
if req.path().startswith('/expired/'):
url = ('http://' + req.get_header('host') +
req.uri().replace('/expired/', '/'))
headers = {'location': url, 'expires': '0'}
req.response(302, headers=headers, body='')
return
if req.path().startswith('/expires/'):
url = ('http://' + req.get_header('host') +
req.uri().replace('/expires/', '/'))
def __call__(self, env, start_response):
path = env['PATH_INFO']
if path.startswith('/expired/'):
url = 'http://' + env['HTTP_HOST'] + path.replace('/expired/', '/')
headers = [('location', url), ('expires', '0')]
start_response("302 Found", headers)
return [""]
if path.startswith('/expires/'):
url = 'http://' + env['HTTP_HOST'] + path.replace('/expires/', '/')
expires = time.time() + (100 * 24 * 60 * 60)
headers = {'location': url, 'expires': httpc.to_http_time(expires)}
req.response(302, headers=headers, body='')
return
return Site.handle_request(self, req)
headers = [('location', url), ('expires', httpc.to_http_time(expires))]
start_response("302 Found", headers)
return [""]
return super(Site302, self).__call__(env, start_response)
class Site303(RedirectSite):
response_code = 303
response_code = "303 See Other"
class Site307(RedirectSite):
response_code = 307
response_code = "307 Temporary Redirect"
class TestHttpc301(TestBase, tests.TestCase):
@@ -332,15 +344,9 @@ class TestHttpc307(TestBase, tests.TestCase):
class Site500(BasicSite):
def handle_request(self, req):
req.response(500, body="screw you world")
return
class Site500(BasicSite):
def handle_request(self, req):
req.response(500, body="screw you world")
return
def __call__(self, env, start_response):
start_response("500 Internal Server Error", [("Content-type", "text/plain")])
return ["screw you world"]
class TestHttpc500(TestBase, tests.TestCase):
@@ -361,8 +367,9 @@ class TestHttpc500(TestBase, tests.TestCase):
class Site504(BasicSite):
def handle_request(self, req):
req.response(504, body="screw you world")
def __call__(self, env, start_response):
start_response("504 Gateway Timeout", [("Content-type", "text/plain")])
return ["screw you world"]
class TestHttpc504(TestBase, tests.TestCase):

View File

@@ -22,10 +22,11 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import time
import unittest
from eventlet import api
from eventlet import channel
from eventlet import coros
from eventlet import pools
@@ -65,13 +66,19 @@ class TestIntPool(unittest.TestCase):
self.assertEquals(self.pool.free(), 4)
def test_exhaustion(self):
waiter = channel.channel()
waiter = coros.event()
def consumer():
gotten = None
cancel = api.exc_after(1, api.TimeoutError)
try:
print time.asctime(), "getting"
gotten = self.pool.get()
print time.asctime(), "got"
finally:
cancel.cancel()
print "waiter send"
waiter.send(gotten)
print "waiter sent"
api.spawn(consumer)
@@ -82,14 +89,17 @@ class TestIntPool(unittest.TestCase):
# Let consumer run; nothing will be in the pool, so he will wait
api.sleep(0)
print "put in pool", one
# Wake consumer
self.pool.put(one)
print "done put"
# wait for the consumer
self.assertEquals(waiter.receive(), one)
self.assertEquals(waiter.wait(), one)
print "done wait"
def test_blocks_on_pool(self):
waiter = channel.channel()
waiter = coros.event()
def greedy():
self.pool.get()
self.pool.get()
@@ -98,7 +108,9 @@ class TestIntPool(unittest.TestCase):
# No one should be waiting yet.
self.assertEquals(self.pool.waiting(), 0)
# The call to the next get will unschedule this routine.
print "calling get"
self.pool.get()
print "called get"
# So this send should never be called.
waiter.send('Failed!')
@@ -113,8 +125,8 @@ class TestIntPool(unittest.TestCase):
## Greedy should be blocking on the last get
self.assertEquals(self.pool.waiting(), 1)
## Send will never be called, so balance should be 0.
self.assertEquals(waiter.balance, 0)
## Send will never be called, so the event should not be ready.
self.assertEquals(waiter.ready(), False)
api.kill(killable)
@@ -140,38 +152,6 @@ class TestIntPool2(unittest.TestCase):
gotten = self.pool.get()
self.assertEquals(gotten, 1)
ALWAYS = RuntimeError('I always fail')
SOMETIMES = RuntimeError('I fail half the time')
class TestFan(unittest.TestCase):
mode = 'static'
def setUp(self):
self.pool = IntPool(max_size=2)
def test_with_list(self):
list_of_input = ['agent-one', 'agent-two', 'agent-three']
def my_callable(pool_item, next_thing):
## Do some "blocking" (yielding) thing
api.sleep(0.01)
return next_thing
output = self.pool.fan(my_callable, list_of_input)
self.assertEquals(list_of_input, output)
def test_all_fail(self):
def my_failure(pool_item, next_thing):
raise ALWAYS
self.assertRaises(pools.AllFailed, self.pool.fan, my_failure, range(4))
def test_some_fail(self):
def my_failing_callable(pool_item, next_thing):
if next_thing % 2:
raise SOMETIMES
return next_thing
self.assertRaises(pools.SomeFailed, self.pool.fan, my_failing_callable, range(4))
if __name__ == '__main__':

View File

@@ -23,7 +23,6 @@ THE SOFTWARE.
"""
from greentest import tests
from eventlet import api
from eventlet import channel
from eventlet import processes
class TestEchoPool(tests.TestCase):

View File

@@ -225,7 +225,7 @@ class TestSaranwrap(unittest.TestCase):
tid = make_uuid()
self.assertEqual(tid.get_version(), uuid.uuid4().get_version())
def make_list():
from eventlet import saranwrap_test
from greentest import saranwrap_test
prox = saranwrap.wrap(saranwrap_test.list_maker)
# after this function returns, prox should fall out of scope
return prox()
@@ -270,7 +270,7 @@ sys_path = sys.path""")
sys.path.remove(temp_dir)
def test_contention(self):
from eventlet import saranwrap_test
from greentest import saranwrap_test
prox = saranwrap.wrap(saranwrap_test)
pool = coros.CoroutinePool(max_size=4)
@@ -296,7 +296,7 @@ sys_path = sys.path""")
def test_list_of_functions(self):
return # this test is known to fail, we can implement it sometime in the future if we wish
from eventlet import saranwrap_test
from greentest import saranwrap_test
prox = saranwrap.wrap([saranwrap_test.list_maker])
self.assertEquals(list_maker(), prox[0]())