[svn r93] "nonblocking" support in tpool, to be exploited by dbmgr, so that

nonblocking DB calls don't need to context-switch to a native thread.
This commit is contained in:
seeping.blister
2008-02-19 18:24:32 -05:00
parent 1c25f1c91a
commit 48321f90d1
3 changed files with 28 additions and 9 deletions

View File

@@ -157,7 +157,7 @@ class GenericConnectionWrapper(object):
def character_set_name(self,*args, **kwargs): return self._base.character_set_name(*args, **kwargs) def character_set_name(self,*args, **kwargs): return self._base.character_set_name(*args, **kwargs)
def close(self,*args, **kwargs): return self._base.close(*args, **kwargs) def close(self,*args, **kwargs): return self._base.close(*args, **kwargs)
def commit(self,*args, **kwargs): return self._base.commit(*args, **kwargs) def commit(self,*args, **kwargs): return self._base.commit(*args, **kwargs)
def cursor(self, cursorclass=None): return self._base.cursor(cursorclass) def cursor(self, cursorclass=None, **kwargs): return self._base.cursor(cursorclass, **kwargs)
def dump_debug_info(self,*args, **kwargs): return self._base.dump_debug_info(*args, **kwargs) def dump_debug_info(self,*args, **kwargs): return self._base.dump_debug_info(*args, **kwargs)
def errno(self,*args, **kwargs): return self._base.errno(*args, **kwargs) def errno(self,*args, **kwargs): return self._base.errno(*args, **kwargs)
def error(self,*args, **kwargs): return self._base.error(*args, **kwargs) def error(self,*args, **kwargs): return self._base.error(*args, **kwargs)

View File

@@ -495,6 +495,10 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
try: try:
self.server.site.handle_request(request) self.server.site.handle_request(request)
except ErrorResponse, err: except ErrorResponse, err:
import sys, traceback
(a,b,tb) = sys.exc_info()
traceback.print_exception(ErrorResponse,err,tb)
request.response(code=err.code, request.response(code=err.code,
reason_phrase=err.reason, reason_phrase=err.reason,
headers=err.headers, headers=err.headers,
@@ -528,6 +532,11 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
raise raise
self.socket.close() self.socket.close()
def tid():
n = long(id(api.getcurrent()))
if n < 0:
n = -n
return hex(n)
class Server(BaseHTTPServer.HTTPServer): class Server(BaseHTTPServer.HTTPServer):
def __init__(self, socket, address, site, log, max_http_version=DEFAULT_MAX_HTTP_VERSION): def __init__(self, socket, address, site, log, max_http_version=DEFAULT_MAX_HTTP_VERSION):
@@ -543,7 +552,7 @@ class Server(BaseHTTPServer.HTTPServer):
self.log = self self.log = self
def write(self, something): def write(self, something):
sys.stdout.write('%s' % (something, )) sys.stdout.write('%s: %s' % (tid(), something, )); sys.stdout.flush()
def log_message(self, message): def log_message(self, message):
self.log.write(message) self.log.write(message)

View File

@@ -24,6 +24,7 @@ from sys import stdout
from Queue import Empty, Queue from Queue import Empty, Queue
from eventlet import api, coros, httpc, httpd, util, wrappedfd from eventlet import api, coros, httpc, httpd, util, wrappedfd
from eventlet.api import trampoline, get_hub
_rpipe, _wpipe = os.pipe() _rpipe, _wpipe = os.pipe()
_rfile = os.fdopen(_rpipe,"r",0) _rfile = os.fdopen(_rpipe,"r",0)
@@ -36,10 +37,10 @@ def _signal_t2e():
_reqq = Queue(maxsize=-1) _reqq = Queue(maxsize=-1)
_rspq = Queue(maxsize=-1) _rspq = Queue(maxsize=-1)
def trampoline(): def tpool_trampoline():
global _reqq, _rspq global _reqq, _rspq
while(True): while(True):
_c = _wrap_rfile.read(1) _c = _wrap_rfile.recv(1)
assert(_c != "") assert(_c != "")
while not _rspq.empty(): while not _rspq.empty():
try: try:
@@ -62,14 +63,20 @@ def tworker():
try: try:
rv = meth(*args,**kwargs) rv = meth(*args,**kwargs)
except Exception,exn: except Exception,exn:
rv = exn import sys, traceback
(a,b,tb) = sys.exc_info()
rv = (exn,a,b,tb)
_rspq.put((e,rv)) _rspq.put((e,rv))
_signal_t2e() _signal_t2e()
def erecv(e): def erecv(e):
rv = e.wait() rv = e.wait()
if isinstance(rv,Exception): if isinstance(rv,tuple) and len(rv) == 4 and isinstance(rv[0],Exception):
raise rv import sys, traceback
(e,a,b,tb) = rv
traceback.print_exception(Exception,e,tb)
traceback.print_stack()
raise e
return rv return rv
def erpc(meth,*args, **kwargs): def erpc(meth,*args, **kwargs):
@@ -92,6 +99,9 @@ class Proxy(object):
if not callable(f): if not callable(f):
return f return f
def doit(*args, **kwargs): def doit(*args, **kwargs):
if kwargs.pop('nonblocking',False):
rv = f(*args, **kwargs)
else:
rv = erpc(f,*args,**kwargs) rv = erpc(f,*args,**kwargs)
if type(rv) in self._autowrap: if type(rv) in self._autowrap:
return Proxy(rv) return Proxy(rv)
@@ -108,6 +118,6 @@ def setup():
_threads[i].setDaemon(True) _threads[i].setDaemon(True)
_threads[i].start() _threads[i].start()
api.spawn(trampoline) api.spawn(tpool_trampoline)
setup() setup()