diff --git a/eventlet/backdoor.py b/eventlet/backdoor.py index 8f792dd..50b5d7d 100644 --- a/eventlet/backdoor.py +++ b/eventlet/backdoor.py @@ -76,7 +76,9 @@ class SocketConsole(greenlib.GreenletContext): def backdoor((conn, addr), locals=None): host, port = addr print "backdoor to %s:%s" % (host, port) - ctx = SocketConsole(conn) + fl = conn.makefile("rw") + fl.newlines = '\n' + ctx = SocketConsole(fl) ctx.register() try: console = InteractiveConsole(locals) diff --git a/eventlet/coros.py b/eventlet/coros.py index 0fdb06d..4276e09 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -124,6 +124,14 @@ class event(object): raise self._exc return self._result + def ready(self): + """ Return true if the wait() call will return immediately. + Used to avoid waiting for things that might take a while to time out. + For example, you can put a bunch of events into a list, and then visit + them all repeatedly, calling ready() until one returns True, and then + you can wait() on that one.""" + return self._result is not NOT_USED + def cancel(self, waiter): """Raise an exception into a coroutine which called wait() an this event instead of returning a value @@ -341,6 +349,10 @@ class CoroutinePool(pools.Pool): """ self._execute(None, func, args, kw) + def killall(self): + for g in self._greenlets: + api.kill(g) + class pipe(object): """ Implementation of pipe using events. Not tested! Not used, either.""" diff --git a/eventlet/greenio.py b/eventlet/greenio.py index 9257a55..fe20a57 100644 --- a/eventlet/greenio.py +++ b/eventlet/greenio.py @@ -21,12 +21,19 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ + from eventlet.api import exc_after, TimeoutError, trampoline, get_hub from eventlet import util + BUFFER_SIZE = 4096 -import socket, errno +import errno +import os +import socket +import fcntl + + from errno import EWOULDBLOCK, EAGAIN @@ -95,7 +102,7 @@ def socket_send(descriptor, data): try: return descriptor.send(data) except socket.error, e: - if e[0] == errno.EWOULDBLOCK: + if e[0] == errno.EWOULDBLOCK or e[0] == errno.ENOTCONN: return 0 raise except util.SSL.WantWriteError: @@ -163,26 +170,38 @@ def file_send(fd, data): except socket.error, e: if e[0] == errno.EPIPE: written = 0 - + + +def set_nonblocking(fd): + ## Socket + if hasattr(fd, 'setblocking'): + fd.setblocking(0) + ## File + else: + fileno = fd.fileno() + flags = fcntl.fcntl(fileno, fcntl.F_GETFL) + fcntl.fcntl(fileno, fcntl.F_SETFL, flags | os.O_NONBLOCK) + class GreenSocket(object): is_secure = False timeout = None def __init__(self, fd): + set_nonblocking(fd) self.fd = fd self._fileno = fd.fileno() self.sendcount = 0 self.recvcount = 0 self.recvbuffer = '' self.closed = False - + def accept(self): fd = self.fd while True: res = socket_accept(fd) if res is not None: client, addr = res - util.set_nonblocking(client) + set_nonblocking(client) return type(self)(client), addr trampoline(fd, read=True, write=True) @@ -215,7 +234,7 @@ class GreenSocket(object): def dup(self, *args, **kw): sock = self.fd.dup(*args, **kw) - util.set_nonblocking(sock) + set_nonblocking(sock) return type(self)(sock) def fileno(self, *args, **kw): @@ -321,9 +340,13 @@ class GreenFile(object): mode = 'wb+' def __init__(self, fd): + if isinstance(fd, GreenSocket): + set_nonblocking(fd.fd) + else: + set_nonblocking(fd) self.sock = fd self.closed = False - + def close(self): self.sock.close() self.closed = True @@ -410,6 +433,7 @@ class GreenPipeSocket(GreenSocket): class GreenPipe(GreenFile): def __init__(self, fd): + set_nonblocking(fd) self.fd = GreenPipeSocket(fd) super(GreenPipe, self).__init__(self.fd) diff --git a/eventlet/hubs/nginx.py b/eventlet/hubs/nginx.py index b23dc04..b25ab39 100644 --- a/eventlet/hubs/nginx.py +++ b/eventlet/hubs/nginx.py @@ -23,31 +23,150 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ + + +from os.path import abspath, dirname +import sys +import traceback + +sys.stdout = sys.stderr +mydir = dirname(dirname(dirname(abspath(__file__)))) +if mydir not in sys.path: + sys.path.append(mydir) + + +from eventlet import api from eventlet import greenlib +from eventlet import httpc from eventlet.hubs import hub +from eventlet import util + + +util.wrap_socket_with_coroutine_socket() + + +def hello_world(env, start_response): + result = httpc.get('http://www.google.com/') + start_response('200 OK', [('Content-type', 'text/plain')]) + return [result] + + +def wrap_application(master, env, start_response): + try: + real_application = api.named(env['eventlet_nginx_wsgi_app']) + except: + real_application = hello_world + result = real_application(env, start_response) + master.switch((result, None)) + return None, None + + +class StartResponse(object): + def __call__(self, *args): + self.args = args + + +pythonpath_already_set = False + + WSGI_POLLIN = 0x01 WSGI_POLLOUT = 0x04 - +import traceback class Hub(hub.BaseHub): - def add_descriptor(self, fileno, read=None, write=None, exc=None): - super(Hub, self).add_descriptor(fileno, read, write, exc) + def __init__(self, *args, **kw): + hub.BaseHub.__init__(self, *args, **kw) + self._connection_wrappers = {} + + def add_descriptor(self, fileno, read=None, write=None, exc=None): + print "ADD DESCRIPTOR", fileno, read, write, exc + traceback.print_stack() + + super(Hub, self).add_descriptor(fileno, read, write, exc) + flag = 0 + if read: + flag |= WSGI_POLLIN + if write: + flag |= WSGI_POLLOUT + conn = self.connection_wrapper(fileno) + self._connection_wrappers[fileno] = conn + print "POLL REGISTER", flag + self.poll_register(conn, flag) - if read is not None: - self.poll_register(fileno, WSGI_POLLIN) - elif write is not None: - self.poll_register(fileno, WSGI_POLLOUT) - def remove_descriptor(self, fileno): super(Hub, self).remove_descriptor(fileno) - self.poll_unregister(fileno) - - def wait(self, seconds=None): - if seconds is not None: - self.sleep(int(seconds*1000)) + try: + self.poll_unregister(self._connection_wrappers[fileno]) + except RuntimeError: + pass - greenlib.switch(self.current_application) + def wait(self, seconds=0): + to_call = getattr(self, 'to_call', None) + print "WAIT", self, to_call + if to_call: + print "CALL TOCALL" + result = to_call[0](to_call[1]) + del self.to_call + return result + greenlib.switch(self.current_application, self.poll(int(seconds*1000))) + + def application(self, env, start_response): + print "ENV",env + self.poll_register = env['ngx.poll_register'] + self.poll_unregister = env['ngx.poll_unregister'] + self.poll = env['ngx.poll'] + self.connection_wrapper = env['ngx.connection_wrapper'] + self.current_application = api.getcurrent() + + slave = api.greenlet.greenlet(wrap_application) + response = StartResponse() + result = slave.switch( + api.getcurrent(), env, response) + + while True: + self.current_application = api.getcurrent() + print "RESULT", result, callable(result[0]) + if result and callable(result[0]): + print "YIELDING!" + yield '' + print "AFTER YIELD!" + conn, flags = result[0]() + fileno = conn.fileno() + if flags & WSGI_POLLIN: + self.readers[fileno](fileno) + elif flags & WSGI_POLLOUT: + self.writers[fileno](fileno) + print "POLL STATE", conn, flags, dir(conn) + else: + start_response(*response.args) + if isinstance(result, tuple): + for x in result[0]: + yield x + else: + for x in result: + yield x + return + result = self.switch() + if not isinstance(result, tuple): + result = (result, None) ## TODO Fix greenlib's return values + + +def application(env, start_response): + hub = api.get_hub() + + if not isinstance(hub, Hub): + api.use_hub(sys.modules[Hub.__module__]) + hub = api.get_hub() + + global pythonpath_already_set + if not pythonpath_already_set: + pythonpath = env.get('eventlet_python_path', '').split(':') + for seg in pythonpath: + if seg not in sys.path: + sys.path.append(seg) + + return hub.application(env, start_response) diff --git a/eventlet/processes.py b/eventlet/processes.py index 35c7dd1..f00b2ae 100644 --- a/eventlet/processes.py +++ b/eventlet/processes.py @@ -21,19 +21,63 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ - +import errno import os import popen2 import signal +import sys - -from eventlet import util, pools +from eventlet import coros +from eventlet import pools from eventlet import greenio + class DeadProcess(RuntimeError): pass +CHILD_PIDS = [] + +CHILD_EVENTS = {} + + +def sig_child(signal, frame): + for child_pid in CHILD_PIDS: + try: + pid, code = os.waitpid(child_pid, os.WNOHANG) + if not pid: + continue ## Wasn't this one that died + elif pid == -1: + print >> sys.stderr, "Got -1! Why didn't python raise?" + elif pid != child_pid: + print >> sys.stderr, "pid (%d) != child_pid (%d)" % (pid, child_pid) + + # Defensively assume we could get a different pid back + if CHILD_EVENTS.get(pid): + event = CHILD_EVENTS.pop(pid) + event.send(code) + + except OSError, e: + if e[0] != errno.ECHILD: + raise e + elif CHILD_EVENTS.get(child_pid): + # Already dead; signal, but assume success + event = CHILD_EVENTS.pop(child_pid) + event.send(0) +signal.signal(signal.SIGCHLD, sig_child) + + +def _add_child_pid(pid): + """Add the given integer 'pid' to the list of child + process ids we are tracking. Return an event object + that can be used to get the process' exit code. + """ + CHILD_PIDS.append(pid) + event = coros.event() + CHILD_EVENTS[pid] = event + return event + + class Process(object): process_number = 0 def __init__(self, command, args, dead_callback=lambda:None): @@ -51,10 +95,11 @@ class Process(object): ## We use popen4 so that read() will read from either stdout or stderr self.popen4 = popen2.Popen4([self.command] + self.args) + self.event = _add_child_pid(self.popen4.pid) child_stdout_stderr = self.popen4.fromchild child_stdin = self.popen4.tochild - util.set_nonblocking(child_stdout_stderr) - util.set_nonblocking(child_stdin) + greenio.set_nonblocking(child_stdout_stderr) + greenio.set_nonblocking(child_stdin) self.child_stdout_stderr = greenio.GreenPipe(child_stdout_stderr) self.child_stdout_stderr.newlines = '\n' # the default is \r\n, which aren't sent over pipes self.child_stdin = greenio.GreenPipe(child_stdin) @@ -116,6 +161,9 @@ class Process(object): def getpid(self): return self.popen4.pid + def wait(self): + return self.event.wait() + class ProcessPool(pools.Pool): def __init__(self, command, args=None, min_size=0, max_size=4): diff --git a/eventlet/support/nginx_mod_wsgi.py b/eventlet/support/nginx_mod_wsgi.py deleted file mode 100644 index 1035680..0000000 --- a/eventlet/support/nginx_mod_wsgi.py +++ /dev/null @@ -1,67 +0,0 @@ - -import sys -import traceback - -sys.path.insert(0, '/Users/donovan/Code/mulib-hg') -sys.stdout = sys.stderr - -from eventlet import api -from eventlet import httpc - -from eventlet.hubs import nginx - - -def old_real_application(env, start_response): - #result = httpc.get('http://127.0.0.1:8081/') - start_response('200 OK', [('Content-type', 'text/plain')]) - #sys.stderr.write("RESULT %r" % (result, )) - return 'hello' - - -def wrap_application(master, env, start_response): - real_application = api.named(env['eventlet_nginx_wsgi_app']) - result = real_application(env, start_response) - ## Should catch exception and return here? - #sys.stderr.write("RESULT2 %r" % (result, )) - master.switch((result, None)) - return None, None - - -class StartResponse(object): - def __init__(self, start_response): - self.start_response = start_response - - def __call__(self, *args): - self.args = args - - -def application(env, start_response): - hub = api.get_hub() - - if not isinstance(hub, nginx.Hub): - api.use_hub(nginx) - - hub.poll_register = env['ngx.poll_register'] - hub.poll_unregister = env['ngx.poll_unregister'] - hub.sleep = env['ngx.sleep'] - hub.current_application = api.getcurrent() - - slave = api.greenlet.greenlet(wrap_application) - response = StartResponse(start_response) - result = slave.switch( - hub.current_application, env, response) - - while True: - #sys.stderr.write("RESULT3 %r" % (result, )) - if result is None or result == (None, None): - yield '' - else: - start_response(*response.args) - if isinstance(result, tuple): - for x in result[0]: - yield x - else: - for x in result: - yield x - return - result = hub.switch() diff --git a/eventlet/tpool.py b/eventlet/tpool.py index aa5e4ad..31eeb4a 100644 --- a/eventlet/tpool.py +++ b/eventlet/tpool.py @@ -17,22 +17,24 @@ See the License for the specific language governing permissions and limitations under the License. """ -import os, socket, time, threading +import os, threading import Queue from sys import stdout from Queue import Empty, Queue -from eventlet import api, coros, httpc, httpd, util, greenio +from eventlet import api, coros, httpc, httpd, greenio from eventlet.api import trampoline, get_hub _rpipe, _wpipe = os.pipe() _rfile = os.fdopen(_rpipe,"r",0) -_wrap_rfile = greenio.GreenPipe(_rfile) -util.set_nonblocking(_rfile) +## Work whether or not wrap_pipe_with_coroutine_pipe was called +if not isinstance(_rfile, greenio.GreenPipe): + _rfile = greenio.GreenPipe(_rfile) + def _signal_t2e(): - nwritten = os.write(_wpipe,' ') + nwritten = greenio.__original_write__(_wpipe,' ') _reqq = Queue(maxsize=-1) _rspq = Queue(maxsize=-1) @@ -79,11 +81,18 @@ def erecv(e): raise e return rv -def erpc(meth,*args, **kwargs): +def execute(meth,*args, **kwargs): + """Execute method in a thread, blocking the current + coroutine until the method completes. + """ e = esend(meth,*args,**kwargs) rv = erecv(e) return rv +## TODO deprecate +erpc = execute + + class Proxy(object): """ a simple proxy-wrapper of any object that comes with a methods-only interface, in order to forward every method invocation onto a thread in the native-thread pool. @@ -102,9 +111,9 @@ class Proxy(object): if kwargs.pop('nonblocking',False): rv = f(*args, **kwargs) else: - rv = erpc(f,*args,**kwargs) + rv = execute(f,*args,**kwargs) if type(rv) in self._autowrap: - return Proxy(rv) + return Proxy(rv, self._autowrap) else: return rv return doit diff --git a/eventlet/util.py b/eventlet/util.py index acbc126..323d3d6 100644 --- a/eventlet/util.py +++ b/eventlet/util.py @@ -24,7 +24,6 @@ THE SOFTWARE. """ import os -import fcntl import socket import errno @@ -67,7 +66,6 @@ __original_socket__ = socket.socket def tcp_socket(): s = __original_socket__(socket.AF_INET, socket.SOCK_STREAM) - set_nonblocking(s) return s @@ -90,6 +88,7 @@ def wrap_ssl(sock, certificate=None, private_key=None): connection.set_connect_state() return greenio.GreenSSL(connection) + socket_already_wrapped = False def wrap_socket_with_coroutine_socket(): global socket_already_wrapped @@ -98,9 +97,7 @@ def wrap_socket_with_coroutine_socket(): def new_socket(*args, **kw): from eventlet import greenio - s = __original_socket__(*args, **kw) - set_nonblocking(s) - return greenio.GreenSocket(s) + return greenio.GreenSocket(__original_socket__(*args, **kw)) socket.socket = new_socket socket.ssl = wrap_ssl @@ -108,6 +105,53 @@ def wrap_socket_with_coroutine_socket(): socket_already_wrapped = True +__original_fdopen__ = os.fdopen +__original_read__ = os.read +__original_write__ = os.write +__original_waitpid__ = os.waitpid +__original_fork__ = os.fork +## TODO wrappings for popen functions? not really needed since Process object exists? + + +pipes_already_wrapped = False +def wrap_pipes_with_coroutine_pipes(): + from eventlet import processes ## Make sure the signal handler is installed + global pipes_already_wrapped + if pipes_already_wrapped: + return + def new_fdopen(*args, **kw): + from eventlet import greenio + return greenio.GreenPipe(__original_fdopen__(*args, **kw)) + def new_read(fd, *args, **kw): + from eventlet import api + api.trampoline(fd, read=True) + return __original_read__(fd, *args, **kw) + def new_write(fd, *args, **kw): + from eventlet import api + api.trampoline(fd, write=True) + return __original_write__(fd, *args, **kw) + def new_fork(*args, **kwargs): + pid = __original_fork__ + if pid: + processes._add_child_pid(pid) + return pid + def new_waitpid(pid, options): + from eventlet import processes + evt = processes.CHILD_EVENTS[pid] + if options == os.WNOHANG: + if evt.ready(): + return pid, evt.wait() + return 0, 0 + elif options: + return __original_waitpid__(pid, result) + return pid, evt.wait() + os.fdopen = new_fdopen + os.read = new_read + os.write = new_write + os.fork = new_fork + os.waitpid = new_waitpid + + def socket_bind_and_listen(descriptor, addr=('', 0), backlog=50): set_reuse_addr(descriptor) descriptor.bind(addr) @@ -124,18 +168,4 @@ def set_reuse_addr(descriptor): ) except socket.error: pass - -def set_nonblocking(descriptor): - if hasattr(descriptor, 'setblocking'): - # socket - descriptor.setblocking(0) - else: - # fd - if hasattr(descriptor, 'fileno'): - fd = descriptor.fileno() - else: - fd = descriptor - flags = fcntl.fcntl(fd, fcntl.F_GETFL) - fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - return descriptor diff --git a/eventlet/wsgi.py b/eventlet/wsgi.py index 9e6cc9f..983dda6 100644 --- a/eventlet/wsgi.py +++ b/eventlet/wsgi.py @@ -35,6 +35,10 @@ import BaseHTTPServer from eventlet import api from eventlet.httpdate import format_date_time +from eventlet import pools + + +DEFAULT_MAX_SIMULTANEOUS_REQUESTS = 1024 DEFAULT_MAX_HTTP_VERSION = 'HTTP/1.1' @@ -276,7 +280,6 @@ class Server(BaseHTTPServer.HTTPServer): self.address = address if log: self.log = log - log.write = log.info else: self.log = sys.stderr self.app = app @@ -305,13 +308,17 @@ class Server(BaseHTTPServer.HTTPServer): self.log.write(message + '\n') + def server(sock, site, log=None, environ=None, max_size=None, max_http_version=DEFAULT_MAX_HTTP_VERSION): serv = Server(sock, sock.getsockname(), site, log, environ=None, max_http_version=max_http_version) + if max_size is None: + max_size = DEFAULT_MAX_SIMULTANEOUS_REQUESTS + pool = pools.CoroutinePool(max_size=max_size) try: print "wsgi starting up on", sock.getsockname() while True: try: - api.spawn(serv.process_request, sock.accept()) + pool.execute_async(lambda: serv.process_request(sock.accept())) except KeyboardInterrupt: api.get_hub().remove_descriptor(sock.fileno()) print "wsgi exiting"