This commit is contained in:
donovan
2008-05-09 09:19:58 -07:00
9 changed files with 307 additions and 123 deletions

View File

@@ -76,7 +76,9 @@ class SocketConsole(greenlib.GreenletContext):
def backdoor((conn, addr), locals=None): def backdoor((conn, addr), locals=None):
host, port = addr host, port = addr
print "backdoor to %s:%s" % (host, port) print "backdoor to %s:%s" % (host, port)
ctx = SocketConsole(conn) fl = conn.makefile("rw")
fl.newlines = '\n'
ctx = SocketConsole(fl)
ctx.register() ctx.register()
try: try:
console = InteractiveConsole(locals) console = InteractiveConsole(locals)

View File

@@ -124,6 +124,14 @@ class event(object):
raise self._exc raise self._exc
return self._result return self._result
def ready(self):
""" Return true if the wait() call will return immediately.
Used to avoid waiting for things that might take a while to time out.
For example, you can put a bunch of events into a list, and then visit
them all repeatedly, calling ready() until one returns True, and then
you can wait() on that one."""
return self._result is not NOT_USED
def cancel(self, waiter): def cancel(self, waiter):
"""Raise an exception into a coroutine which called """Raise an exception into a coroutine which called
wait() an this event instead of returning a value wait() an this event instead of returning a value
@@ -341,6 +349,10 @@ class CoroutinePool(pools.Pool):
""" """
self._execute(None, func, args, kw) self._execute(None, func, args, kw)
def killall(self):
for g in self._greenlets:
api.kill(g)
class pipe(object): class pipe(object):
""" Implementation of pipe using events. Not tested! Not used, either.""" """ Implementation of pipe using events. Not tested! Not used, either."""

View File

@@ -21,12 +21,19 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. THE SOFTWARE.
""" """
from eventlet.api import exc_after, TimeoutError, trampoline, get_hub from eventlet.api import exc_after, TimeoutError, trampoline, get_hub
from eventlet import util from eventlet import util
BUFFER_SIZE = 4096 BUFFER_SIZE = 4096
import socket, errno import errno
import os
import socket
import fcntl
from errno import EWOULDBLOCK, EAGAIN from errno import EWOULDBLOCK, EAGAIN
@@ -95,7 +102,7 @@ def socket_send(descriptor, data):
try: try:
return descriptor.send(data) return descriptor.send(data)
except socket.error, e: except socket.error, e:
if e[0] == errno.EWOULDBLOCK: if e[0] == errno.EWOULDBLOCK or e[0] == errno.ENOTCONN:
return 0 return 0
raise raise
except util.SSL.WantWriteError: except util.SSL.WantWriteError:
@@ -165,10 +172,22 @@ def file_send(fd, data):
written = 0 written = 0
def set_nonblocking(fd):
## Socket
if hasattr(fd, 'setblocking'):
fd.setblocking(0)
## File
else:
fileno = fd.fileno()
flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
fcntl.fcntl(fileno, fcntl.F_SETFL, flags | os.O_NONBLOCK)
class GreenSocket(object): class GreenSocket(object):
is_secure = False is_secure = False
timeout = None timeout = None
def __init__(self, fd): def __init__(self, fd):
set_nonblocking(fd)
self.fd = fd self.fd = fd
self._fileno = fd.fileno() self._fileno = fd.fileno()
self.sendcount = 0 self.sendcount = 0
@@ -182,7 +201,7 @@ class GreenSocket(object):
res = socket_accept(fd) res = socket_accept(fd)
if res is not None: if res is not None:
client, addr = res client, addr = res
util.set_nonblocking(client) set_nonblocking(client)
return type(self)(client), addr return type(self)(client), addr
trampoline(fd, read=True, write=True) trampoline(fd, read=True, write=True)
@@ -215,7 +234,7 @@ class GreenSocket(object):
def dup(self, *args, **kw): def dup(self, *args, **kw):
sock = self.fd.dup(*args, **kw) sock = self.fd.dup(*args, **kw)
util.set_nonblocking(sock) set_nonblocking(sock)
return type(self)(sock) return type(self)(sock)
def fileno(self, *args, **kw): def fileno(self, *args, **kw):
@@ -321,6 +340,10 @@ class GreenFile(object):
mode = 'wb+' mode = 'wb+'
def __init__(self, fd): def __init__(self, fd):
if isinstance(fd, GreenSocket):
set_nonblocking(fd.fd)
else:
set_nonblocking(fd)
self.sock = fd self.sock = fd
self.closed = False self.closed = False
@@ -410,6 +433,7 @@ class GreenPipeSocket(GreenSocket):
class GreenPipe(GreenFile): class GreenPipe(GreenFile):
def __init__(self, fd): def __init__(self, fd):
set_nonblocking(fd)
self.fd = GreenPipeSocket(fd) self.fd = GreenPipeSocket(fd)
super(GreenPipe, self).__init__(self.fd) super(GreenPipe, self).__init__(self.fd)

View File

@@ -23,31 +23,150 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. THE SOFTWARE.
""" """
from os.path import abspath, dirname
import sys
import traceback
sys.stdout = sys.stderr
mydir = dirname(dirname(dirname(abspath(__file__))))
if mydir not in sys.path:
sys.path.append(mydir)
from eventlet import api
from eventlet import greenlib from eventlet import greenlib
from eventlet import httpc
from eventlet.hubs import hub from eventlet.hubs import hub
from eventlet import util
util.wrap_socket_with_coroutine_socket()
def hello_world(env, start_response):
result = httpc.get('http://www.google.com/')
start_response('200 OK', [('Content-type', 'text/plain')])
return [result]
def wrap_application(master, env, start_response):
try:
real_application = api.named(env['eventlet_nginx_wsgi_app'])
except:
real_application = hello_world
result = real_application(env, start_response)
master.switch((result, None))
return None, None
class StartResponse(object):
def __call__(self, *args):
self.args = args
pythonpath_already_set = False
WSGI_POLLIN = 0x01 WSGI_POLLIN = 0x01
WSGI_POLLOUT = 0x04 WSGI_POLLOUT = 0x04
import traceback
class Hub(hub.BaseHub): class Hub(hub.BaseHub):
def add_descriptor(self, fileno, read=None, write=None, exc=None): def __init__(self, *args, **kw):
super(Hub, self).add_descriptor(fileno, read, write, exc) hub.BaseHub.__init__(self, *args, **kw)
self._connection_wrappers = {}
if read is not None: def add_descriptor(self, fileno, read=None, write=None, exc=None):
self.poll_register(fileno, WSGI_POLLIN) print "ADD DESCRIPTOR", fileno, read, write, exc
elif write is not None: traceback.print_stack()
self.poll_register(fileno, WSGI_POLLOUT)
super(Hub, self).add_descriptor(fileno, read, write, exc)
flag = 0
if read:
flag |= WSGI_POLLIN
if write:
flag |= WSGI_POLLOUT
conn = self.connection_wrapper(fileno)
self._connection_wrappers[fileno] = conn
print "POLL REGISTER", flag
self.poll_register(conn, flag)
def remove_descriptor(self, fileno): def remove_descriptor(self, fileno):
super(Hub, self).remove_descriptor(fileno) super(Hub, self).remove_descriptor(fileno)
self.poll_unregister(fileno) try:
self.poll_unregister(self._connection_wrappers[fileno])
except RuntimeError:
pass
def wait(self, seconds=None): def wait(self, seconds=0):
if seconds is not None: to_call = getattr(self, 'to_call', None)
self.sleep(int(seconds*1000)) print "WAIT", self, to_call
if to_call:
print "CALL TOCALL"
result = to_call[0](to_call[1])
del self.to_call
return result
greenlib.switch(self.current_application, self.poll(int(seconds*1000)))
greenlib.switch(self.current_application) def application(self, env, start_response):
print "ENV",env
self.poll_register = env['ngx.poll_register']
self.poll_unregister = env['ngx.poll_unregister']
self.poll = env['ngx.poll']
self.connection_wrapper = env['ngx.connection_wrapper']
self.current_application = api.getcurrent()
slave = api.greenlet.greenlet(wrap_application)
response = StartResponse()
result = slave.switch(
api.getcurrent(), env, response)
while True:
self.current_application = api.getcurrent()
print "RESULT", result, callable(result[0])
if result and callable(result[0]):
print "YIELDING!"
yield ''
print "AFTER YIELD!"
conn, flags = result[0]()
fileno = conn.fileno()
if flags & WSGI_POLLIN:
self.readers[fileno](fileno)
elif flags & WSGI_POLLOUT:
self.writers[fileno](fileno)
print "POLL STATE", conn, flags, dir(conn)
else:
start_response(*response.args)
if isinstance(result, tuple):
for x in result[0]:
yield x
else:
for x in result:
yield x
return
result = self.switch()
if not isinstance(result, tuple):
result = (result, None) ## TODO Fix greenlib's return values
def application(env, start_response):
hub = api.get_hub()
if not isinstance(hub, Hub):
api.use_hub(sys.modules[Hub.__module__])
hub = api.get_hub()
global pythonpath_already_set
if not pythonpath_already_set:
pythonpath = env.get('eventlet_python_path', '').split(':')
for seg in pythonpath:
if seg not in sys.path:
sys.path.append(seg)
return hub.application(env, start_response)

View File

@@ -21,19 +21,63 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. THE SOFTWARE.
""" """
import errno
import os import os
import popen2 import popen2
import signal import signal
import sys
from eventlet import coros
from eventlet import util, pools from eventlet import pools
from eventlet import greenio from eventlet import greenio
class DeadProcess(RuntimeError): class DeadProcess(RuntimeError):
pass pass
CHILD_PIDS = []
CHILD_EVENTS = {}
def sig_child(signal, frame):
for child_pid in CHILD_PIDS:
try:
pid, code = os.waitpid(child_pid, os.WNOHANG)
if not pid:
continue ## Wasn't this one that died
elif pid == -1:
print >> sys.stderr, "Got -1! Why didn't python raise?"
elif pid != child_pid:
print >> sys.stderr, "pid (%d) != child_pid (%d)" % (pid, child_pid)
# Defensively assume we could get a different pid back
if CHILD_EVENTS.get(pid):
event = CHILD_EVENTS.pop(pid)
event.send(code)
except OSError, e:
if e[0] != errno.ECHILD:
raise e
elif CHILD_EVENTS.get(child_pid):
# Already dead; signal, but assume success
event = CHILD_EVENTS.pop(child_pid)
event.send(0)
signal.signal(signal.SIGCHLD, sig_child)
def _add_child_pid(pid):
"""Add the given integer 'pid' to the list of child
process ids we are tracking. Return an event object
that can be used to get the process' exit code.
"""
CHILD_PIDS.append(pid)
event = coros.event()
CHILD_EVENTS[pid] = event
return event
class Process(object): class Process(object):
process_number = 0 process_number = 0
def __init__(self, command, args, dead_callback=lambda:None): def __init__(self, command, args, dead_callback=lambda:None):
@@ -51,10 +95,11 @@ class Process(object):
## We use popen4 so that read() will read from either stdout or stderr ## We use popen4 so that read() will read from either stdout or stderr
self.popen4 = popen2.Popen4([self.command] + self.args) self.popen4 = popen2.Popen4([self.command] + self.args)
self.event = _add_child_pid(self.popen4.pid)
child_stdout_stderr = self.popen4.fromchild child_stdout_stderr = self.popen4.fromchild
child_stdin = self.popen4.tochild child_stdin = self.popen4.tochild
util.set_nonblocking(child_stdout_stderr) greenio.set_nonblocking(child_stdout_stderr)
util.set_nonblocking(child_stdin) greenio.set_nonblocking(child_stdin)
self.child_stdout_stderr = greenio.GreenPipe(child_stdout_stderr) self.child_stdout_stderr = greenio.GreenPipe(child_stdout_stderr)
self.child_stdout_stderr.newlines = '\n' # the default is \r\n, which aren't sent over pipes self.child_stdout_stderr.newlines = '\n' # the default is \r\n, which aren't sent over pipes
self.child_stdin = greenio.GreenPipe(child_stdin) self.child_stdin = greenio.GreenPipe(child_stdin)
@@ -116,6 +161,9 @@ class Process(object):
def getpid(self): def getpid(self):
return self.popen4.pid return self.popen4.pid
def wait(self):
return self.event.wait()
class ProcessPool(pools.Pool): class ProcessPool(pools.Pool):
def __init__(self, command, args=None, min_size=0, max_size=4): def __init__(self, command, args=None, min_size=0, max_size=4):

View File

@@ -1,67 +0,0 @@
import sys
import traceback
sys.path.insert(0, '/Users/donovan/Code/mulib-hg')
sys.stdout = sys.stderr
from eventlet import api
from eventlet import httpc
from eventlet.hubs import nginx
def old_real_application(env, start_response):
#result = httpc.get('http://127.0.0.1:8081/')
start_response('200 OK', [('Content-type', 'text/plain')])
#sys.stderr.write("RESULT %r" % (result, ))
return 'hello'
def wrap_application(master, env, start_response):
real_application = api.named(env['eventlet_nginx_wsgi_app'])
result = real_application(env, start_response)
## Should catch exception and return here?
#sys.stderr.write("RESULT2 %r" % (result, ))
master.switch((result, None))
return None, None
class StartResponse(object):
def __init__(self, start_response):
self.start_response = start_response
def __call__(self, *args):
self.args = args
def application(env, start_response):
hub = api.get_hub()
if not isinstance(hub, nginx.Hub):
api.use_hub(nginx)
hub.poll_register = env['ngx.poll_register']
hub.poll_unregister = env['ngx.poll_unregister']
hub.sleep = env['ngx.sleep']
hub.current_application = api.getcurrent()
slave = api.greenlet.greenlet(wrap_application)
response = StartResponse(start_response)
result = slave.switch(
hub.current_application, env, response)
while True:
#sys.stderr.write("RESULT3 %r" % (result, ))
if result is None or result == (None, None):
yield ''
else:
start_response(*response.args)
if isinstance(result, tuple):
for x in result[0]:
yield x
else:
for x in result:
yield x
return
result = hub.switch()

View File

@@ -17,22 +17,24 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
import os, socket, time, threading import os, threading
import Queue import Queue
from sys import stdout from sys import stdout
from Queue import Empty, Queue from Queue import Empty, Queue
from eventlet import api, coros, httpc, httpd, util, greenio from eventlet import api, coros, httpc, httpd, greenio
from eventlet.api import trampoline, get_hub from eventlet.api import trampoline, get_hub
_rpipe, _wpipe = os.pipe() _rpipe, _wpipe = os.pipe()
_rfile = os.fdopen(_rpipe,"r",0) _rfile = os.fdopen(_rpipe,"r",0)
_wrap_rfile = greenio.GreenPipe(_rfile) ## Work whether or not wrap_pipe_with_coroutine_pipe was called
util.set_nonblocking(_rfile) if not isinstance(_rfile, greenio.GreenPipe):
_rfile = greenio.GreenPipe(_rfile)
def _signal_t2e(): def _signal_t2e():
nwritten = os.write(_wpipe,' ') nwritten = greenio.__original_write__(_wpipe,' ')
_reqq = Queue(maxsize=-1) _reqq = Queue(maxsize=-1)
_rspq = Queue(maxsize=-1) _rspq = Queue(maxsize=-1)
@@ -79,11 +81,18 @@ def erecv(e):
raise e raise e
return rv return rv
def erpc(meth,*args, **kwargs): def execute(meth,*args, **kwargs):
"""Execute method in a thread, blocking the current
coroutine until the method completes.
"""
e = esend(meth,*args,**kwargs) e = esend(meth,*args,**kwargs)
rv = erecv(e) rv = erecv(e)
return rv return rv
## TODO deprecate
erpc = execute
class Proxy(object): class Proxy(object):
""" a simple proxy-wrapper of any object that comes with a methods-only interface, """ a simple proxy-wrapper of any object that comes with a methods-only interface,
in order to forward every method invocation onto a thread in the native-thread pool. in order to forward every method invocation onto a thread in the native-thread pool.
@@ -102,9 +111,9 @@ class Proxy(object):
if kwargs.pop('nonblocking',False): if kwargs.pop('nonblocking',False):
rv = f(*args, **kwargs) rv = f(*args, **kwargs)
else: else:
rv = erpc(f,*args,**kwargs) rv = execute(f,*args,**kwargs)
if type(rv) in self._autowrap: if type(rv) in self._autowrap:
return Proxy(rv) return Proxy(rv, self._autowrap)
else: else:
return rv return rv
return doit return doit

View File

@@ -24,7 +24,6 @@ THE SOFTWARE.
""" """
import os import os
import fcntl
import socket import socket
import errno import errno
@@ -67,7 +66,6 @@ __original_socket__ = socket.socket
def tcp_socket(): def tcp_socket():
s = __original_socket__(socket.AF_INET, socket.SOCK_STREAM) s = __original_socket__(socket.AF_INET, socket.SOCK_STREAM)
set_nonblocking(s)
return s return s
@@ -90,6 +88,7 @@ def wrap_ssl(sock, certificate=None, private_key=None):
connection.set_connect_state() connection.set_connect_state()
return greenio.GreenSSL(connection) return greenio.GreenSSL(connection)
socket_already_wrapped = False socket_already_wrapped = False
def wrap_socket_with_coroutine_socket(): def wrap_socket_with_coroutine_socket():
global socket_already_wrapped global socket_already_wrapped
@@ -98,9 +97,7 @@ def wrap_socket_with_coroutine_socket():
def new_socket(*args, **kw): def new_socket(*args, **kw):
from eventlet import greenio from eventlet import greenio
s = __original_socket__(*args, **kw) return greenio.GreenSocket(__original_socket__(*args, **kw))
set_nonblocking(s)
return greenio.GreenSocket(s)
socket.socket = new_socket socket.socket = new_socket
socket.ssl = wrap_ssl socket.ssl = wrap_ssl
@@ -108,6 +105,53 @@ def wrap_socket_with_coroutine_socket():
socket_already_wrapped = True socket_already_wrapped = True
__original_fdopen__ = os.fdopen
__original_read__ = os.read
__original_write__ = os.write
__original_waitpid__ = os.waitpid
__original_fork__ = os.fork
## TODO wrappings for popen functions? not really needed since Process object exists?
pipes_already_wrapped = False
def wrap_pipes_with_coroutine_pipes():
from eventlet import processes ## Make sure the signal handler is installed
global pipes_already_wrapped
if pipes_already_wrapped:
return
def new_fdopen(*args, **kw):
from eventlet import greenio
return greenio.GreenPipe(__original_fdopen__(*args, **kw))
def new_read(fd, *args, **kw):
from eventlet import api
api.trampoline(fd, read=True)
return __original_read__(fd, *args, **kw)
def new_write(fd, *args, **kw):
from eventlet import api
api.trampoline(fd, write=True)
return __original_write__(fd, *args, **kw)
def new_fork(*args, **kwargs):
pid = __original_fork__
if pid:
processes._add_child_pid(pid)
return pid
def new_waitpid(pid, options):
from eventlet import processes
evt = processes.CHILD_EVENTS[pid]
if options == os.WNOHANG:
if evt.ready():
return pid, evt.wait()
return 0, 0
elif options:
return __original_waitpid__(pid, result)
return pid, evt.wait()
os.fdopen = new_fdopen
os.read = new_read
os.write = new_write
os.fork = new_fork
os.waitpid = new_waitpid
def socket_bind_and_listen(descriptor, addr=('', 0), backlog=50): def socket_bind_and_listen(descriptor, addr=('', 0), backlog=50):
set_reuse_addr(descriptor) set_reuse_addr(descriptor)
descriptor.bind(addr) descriptor.bind(addr)
@@ -125,17 +169,3 @@ def set_reuse_addr(descriptor):
except socket.error: except socket.error:
pass pass
def set_nonblocking(descriptor):
if hasattr(descriptor, 'setblocking'):
# socket
descriptor.setblocking(0)
else:
# fd
if hasattr(descriptor, 'fileno'):
fd = descriptor.fileno()
else:
fd = descriptor
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
return descriptor

View File

@@ -35,6 +35,10 @@ import BaseHTTPServer
from eventlet import api from eventlet import api
from eventlet.httpdate import format_date_time from eventlet.httpdate import format_date_time
from eventlet import pools
DEFAULT_MAX_SIMULTANEOUS_REQUESTS = 1024
DEFAULT_MAX_HTTP_VERSION = 'HTTP/1.1' DEFAULT_MAX_HTTP_VERSION = 'HTTP/1.1'
@@ -276,7 +280,6 @@ class Server(BaseHTTPServer.HTTPServer):
self.address = address self.address = address
if log: if log:
self.log = log self.log = log
log.write = log.info
else: else:
self.log = sys.stderr self.log = sys.stderr
self.app = app self.app = app
@@ -305,13 +308,17 @@ class Server(BaseHTTPServer.HTTPServer):
self.log.write(message + '\n') self.log.write(message + '\n')
def server(sock, site, log=None, environ=None, max_size=None, max_http_version=DEFAULT_MAX_HTTP_VERSION): def server(sock, site, log=None, environ=None, max_size=None, max_http_version=DEFAULT_MAX_HTTP_VERSION):
serv = Server(sock, sock.getsockname(), site, log, environ=None, max_http_version=max_http_version) serv = Server(sock, sock.getsockname(), site, log, environ=None, max_http_version=max_http_version)
if max_size is None:
max_size = DEFAULT_MAX_SIMULTANEOUS_REQUESTS
pool = pools.CoroutinePool(max_size=max_size)
try: try:
print "wsgi starting up on", sock.getsockname() print "wsgi starting up on", sock.getsockname()
while True: while True:
try: try:
api.spawn(serv.process_request, sock.accept()) pool.execute_async(lambda: serv.process_request(sock.accept()))
except KeyboardInterrupt: except KeyboardInterrupt:
api.get_hub().remove_descriptor(sock.fileno()) api.get_hub().remove_descriptor(sock.fileno())
print "wsgi exiting" print "wsgi exiting"