converted eventlet to use greenlet's switch and throw methods instead of greenlib.switch; greenlib is not used anymore.

This commit is contained in:
Denis Bilenko
2008-11-24 14:30:42 +06:00
parent 5d8f728a10
commit de8f807688
10 changed files with 118 additions and 78 deletions

View File

@@ -31,7 +31,7 @@ import inspect
import traceback
from eventlet.support import greenlet
from eventlet import greenlib, tls
from eventlet import tls
__all__ = [
'call_after', 'exc_after', 'getcurrent', 'get_default_hub', 'get_hub',
@@ -135,15 +135,15 @@ def trampoline(fd, read=None, write=None, timeout=None, timeout_exc=TimeoutError
"""
t = None
hub = get_hub()
self = greenlet.getcurrent()
assert hub.greenlet is not self, 'do not call blocking functions from the mainloop'
current = greenlet.getcurrent()
assert hub.greenlet is not current, 'do not call blocking functions from the mainloop'
fileno = getattr(fd, 'fileno', lambda: fd)()
def _do_close(_d, error=None):
greenlib.switch(self, exc=getattr(error, 'value', None)) # convert to socket.error
current.throw(getattr(error, 'value', None)) # convert to socket.error
def _do_timeout():
greenlib.switch(self, exc=timeout_exc())
current.throw(timeout_exc())
def cb(d):
greenlib.switch(self)
current.switch()
# with TwistedHub, descriptor actually an object (socket_rwdescriptor) which stores
# this callback. If this callback stores a reference to the socket instance (fd)
# then descriptor has a reference to that instance. This makes socket not collected
@@ -173,10 +173,10 @@ def get_fileno(obj):
return f()
def select(read_list, write_list, error_list, timeout=None):
self = get_hub()
hub = get_hub()
t = None
current = greenlet.getcurrent()
assert self.greenlet is not current, 'do not call blocking functions from the mainloop'
assert hub.greenlet is not current, 'do not call blocking functions from the mainloop'
ds = {}
for r in read_list:
ds[get_fileno(r)] = {'read' : r}
@@ -189,33 +189,33 @@ def select(read_list, write_list, error_list, timeout=None):
def on_read(d):
original = ds[get_fileno(d)]['read']
greenlib.switch(current, ([original], [], []))
current.switch(([original], [], []))
def on_write(d):
original = ds[get_fileno(d)]['write']
greenlib.switch(current, ([], [original], []))
current.switch(([], [original], []))
def on_error(d, _err=None):
original = ds[get_fileno(d)]['error']
greenlib.switch(current, ([], [], [original]))
current.switch(([], [], [original]))
def on_timeout():
greenlib.switch(current, ([], [], []))
current.switch(([], [], []))
if timeout is not None:
t = self.schedule_call(timeout, on_timeout)
t = hub.schedule_call(timeout, on_timeout)
try:
for k, v in ds.iteritems():
d = self.add_descriptor(k,
d = hub.add_descriptor(k,
v.get('read') is not None and on_read,
v.get('write') is not None and on_write,
v.get('error') is not None and on_error)
descriptors.append(d)
try:
return self.switch()
return hub.switch()
finally:
for d in descriptors:
self.remove_descriptor(d)
hub.remove_descriptor(d)
finally:
if t is not None:
t.cancel()
@@ -223,7 +223,7 @@ def select(read_list, write_list, error_list, timeout=None):
def _spawn_startup(cb, args, kw, cancel=None):
try:
greenlib.switch(greenlet.getcurrent().parent)
greenlet.getcurrent().parent.switch()
cancel = None
finally:
if cancel is not None:
@@ -232,7 +232,25 @@ def _spawn_startup(cb, args, kw, cancel=None):
def _spawn(g):
g.parent = greenlet.getcurrent()
greenlib.switch(g)
g.switch()
class CancellingTimersGreenlet(greenlet.greenlet):
def __init__(self, run=None, parent=None, hub=None):
self._run = run
if parent is None:
parent = greenlet.getcurrent()
if hub is None:
hub = get_hub()
self.hub = hub
greenlet.greenlet.__init__(self, None, parent)
def run(self, *args, **kwargs):
try:
return self._run(*args, **kwargs)
finally:
self.hub.cancel_timers(self, quiet=True)
def spawn(function, *args, **kwds):
@@ -251,17 +269,18 @@ def spawn(function, *args, **kwds):
"""
# killable
t = None
g = greenlib.tracked_greenlet()
t = get_hub().schedule_call(0, _spawn, g)
greenlib.switch(g, (_spawn_startup, function, args, kwds, t.cancel))
g = CancellingTimersGreenlet(_spawn_startup)
t = get_hub().schedule_call_global(0, _spawn, g)
g.switch(function, args, kwds, t.cancel)
return g
def kill(g):
get_hub().schedule_call(0, greenlib.kill, g)
get_hub().schedule_call(0, g.throw)
sleep(0)
def call_after(seconds, function, *args, **kwds):
def call_after_global(seconds, function, *args, **kwds):
"""Schedule *function* to be called after *seconds* have elapsed.
The function will be scheduled even if the current greenlet has exited.
*seconds* may be specified as an integer, or a float if fractional seconds
are desired. The *function* will be called with the given *args* and
@@ -272,12 +291,35 @@ def call_after(seconds, function, *args, **kwds):
"""
# cancellable
def startup():
g = greenlib.tracked_greenlet()
greenlib.switch(g, (_spawn_startup, function, args, kwds))
greenlib.switch(g)
return get_hub().schedule_call(seconds, startup)
g = CancellingTimersGreenlet(_spawn_startup)
g.switch(function, args, kwds)
g.switch()
t = get_hub().schedule_call_global(seconds, startup)
return t
def call_after_local(seconds, function, *args, **kwds):
"""Schedule *function* to be called after *seconds* have elapsed.
The function will NOT be called if the current greenlet has exited.
*seconds* may be specified as an integer, or a float if fractional seconds
are desired. The *function* will be called with the given *args* and
keyword arguments *kwds*, and will be executed within the main loop's
coroutine.
Its return value is discarded. Any uncaught exception will be logged.
"""
# cancellable
def startup():
g = CancellingTimersGreenlet(_spawn_startup)
g.switch(function, args, kwds)
g.switch()
t = get_hub().schedule_call_local(seconds, startup)
return t
# for compatibility with original eventlet API
call_after = call_after_local
def with_timeout(seconds, func, *args, **kwds):
"""Wrap a call to some (yielding) function with a timeout; if the called
function fails to return before the timeout, cancel it and return a flag
@@ -422,15 +464,13 @@ def sleep(seconds=0):
"""
hub = get_hub()
assert hub.greenlet is not greenlet.getcurrent(), 'do not call blocking functions from the mainloop'
timer = hub.schedule_call(seconds, greenlib.switch, greenlet.getcurrent())
timer = hub.schedule_call(seconds, greenlet.getcurrent().switch)
try:
hub.switch()
finally:
timer.cancel()
switch = greenlib.switch
local_dict = greenlib.greenlet_dict
getcurrent = greenlet.getcurrent
GreenletExit = greenlet.GreenletExit

View File

@@ -25,7 +25,7 @@ THE SOFTWARE.
import collections
from eventlet import api, greenlib
from eventlet import api
from eventlet.support import greenlet
__all__ = ['channel']
@@ -49,8 +49,13 @@ class channel(object):
def _tasklet_loop(self):
deque = self.deque = collections.deque()
hub = api.get_hub()
switch = greenlib.switch
direction, caller, args = switch()
current = greenlet.getcurrent()
def switch(g, value=None, exc=None):
if exc is None:
return g.switch(value)
else:
return g.throw(exc)
direction, caller, args = switch(current.parent or current)
try:
while True:
if direction == -1:
@@ -80,12 +85,12 @@ class channel(object):
try:
t = self._tasklet
except AttributeError:
t = self._tasklet = greenlib.tracked_greenlet()
greenlib.switch(t, (self._tasklet_loop,))
t = self._tasklet = greenlet.greenlet(self._tasklet_loop)
t.switch()
if args:
return greenlib.switch(t, (1, greenlet.getcurrent(), args))
return t.switch((1, greenlet.getcurrent(), args))
else:
return greenlib.switch(t, (-1, greenlet.getcurrent(), args))
return t.switch((-1, greenlet.getcurrent(), args))
def receive(self):
return self._send_tasklet()

View File

@@ -31,7 +31,6 @@ import traceback
from eventlet import api
from eventlet import channel
from eventlet import pools
from eventlet import greenlib
try:
@@ -182,7 +181,7 @@ class event(object):
if waiter in self._waiters:
del self._waiters[waiter]
api.get_hub().schedule_call(
0, greenlib.switch, waiter, None, Cancelled())
0, waiter.throw, Cancelled())
def send(self, result=None, exc=None):
"""Makes arrangements for the waiters to be woken with the
@@ -215,8 +214,7 @@ class event(object):
self._exc = exc
hub = api.get_hub()
for waiter in self._waiters:
hub.schedule_call(0, greenlib.switch, waiter, self._result)
hub.schedule_call(0, waiter.switch, self._result)
class semaphore(object):
"""Classic semaphore implemented with a counter and an event.

View File

@@ -1,16 +1,21 @@
"""implements standard module 'thread' with greenlets"""
from __future__ import absolute_import
import thread as thread_module
from eventlet.greenlib import greenlet_id as get_ident
from eventlet.support import greenlet
from eventlet.api import spawn
from eventlet.coros import semaphore as LockType
error = thread_module.error
def get_ident(gr=None):
if gr is None:
return id(greenlet.getcurrent())
else:
return id(gr)
def start_new_thread(function, args=(), kwargs={}):
g = spawn(function, *args, **kwargs)
return get_ident(g) or 0 # XXX 0 only for main greenlet, None for the rest untracked
return get_ident(g)
def allocate_lock():
return LockType(1)

View File

@@ -30,8 +30,6 @@ import traceback
import time
from eventlet.support import greenlet
from eventlet import greenlib
from eventlet.timer import Timer
_g_debug = True
@@ -49,7 +47,7 @@ class BaseHub(object):
self.waiters_by_greenlet = {}
self.clock = clock
self.greenlet = None
self.greenlet = greenlet.greenlet(self.run)
self.stopping = False
self.running = False
self.timers = []
@@ -106,7 +104,7 @@ class BaseHub(object):
fileno = self.waiters_by_greenlet.pop(gr, None)
if fileno is not None:
self.remove_descriptor(fileno)
greenlib.switch(gr, None, exception_object)
gr.throw(exception_object)
def exc_descriptor(self, fileno):
exc = self.excs.get(fileno)
@@ -122,16 +120,13 @@ class BaseHub(object):
self.switch()
def switch(self):
if not self.greenlet:
self.greenlet = greenlib.tracked_greenlet()
args = ((self.run,),)
else:
args = ()
if self.greenlet.dead:
self.greenlet = greenlet.greenlet(self.run)
try:
greenlet.getcurrent().parent = self.greenlet
except ValueError:
pass
return greenlib.switch(self.greenlet, *args)
return self.greenlet.switch()
def squelch_exception(self, fileno, exc_info):
traceback.print_exception(*exc_info)

View File

@@ -29,7 +29,6 @@ import errno
import traceback
import time
from eventlet import greenlib
from eventlet.timer import Timer
from eventlet.hubs import hub

View File

@@ -36,7 +36,6 @@ if mydir not in sys.path:
from eventlet import api
from eventlet import greenlib
from eventlet import httpc
from eventlet.hubs import hub
from eventlet import util
@@ -112,7 +111,7 @@ class Hub(hub.BaseHub):
result = to_call[0](to_call[1])
del self.to_call
return result
greenlib.switch(self.current_application, self.poll(int(seconds*1000)))
self.current_application.switch(self.poll(int(seconds*1000)))
def application(self, env, start_response):
print "ENV",env
@@ -151,8 +150,8 @@ class Hub(hub.BaseHub):
yield x
return
result = self.switch()
if not isinstance(result, tuple):
result = (result, None) ## TODO Fix greenlib's return values
#if not isinstance(result, tuple):
# result = (result, None) ## TODO Fix greenlib's return values
def application(env, start_response):

View File

@@ -31,7 +31,6 @@ import traceback
from time import sleep
import time
from eventlet import greenlib
from eventlet.hubs import hub
EXC_MASK = select.POLLERR | select.POLLHUP | select.POLLNVAL

View File

@@ -3,7 +3,6 @@ import threading
import weakref
from twisted.internet.base import DelayedCall as TwistedDelayedCall
from eventlet.hubs.hub import _g_debug
from eventlet import greenlib
from eventlet.support.greenlet import greenlet
import traceback
@@ -78,7 +77,7 @@ class BaseTwistedHub(object):
greenlet.getcurrent().parent = self.greenlet
except ValueError, ex:
pass
return greenlib.switch(self.greenlet)
return self.greenlet.switch()
def stop(self):
from twisted.internet import reactor
@@ -105,7 +104,7 @@ class BaseTwistedHub(object):
fileno = self.waiters_by_greenlet.pop(gr, None)
if fileno is not None:
self.remove_descriptor(fileno)
greenlib.switch(gr, None, exception_object)
gr.throw(exception_object)
# required by GreenSocket
def exc_descriptor(self, _fileno):
@@ -212,7 +211,6 @@ class BaseTwistedHub(object):
return len(reactor.getDelayedCalls())
class TwistedHub(BaseTwistedHub):
# wrapper around reactor that runs reactor's main loop in a separate greenlet.
# whenever you need to wait, i.e. inside a call that must appear
@@ -235,19 +233,17 @@ class TwistedHub(BaseTwistedHub):
assert Hub.state==0, ('This hub can only be instantiated once', Hub.state)
Hub.state = 1
make_twisted_threadpool_daemonic() # otherwise the program would hang after the main greenlet exited
BaseTwistedHub.__init__(self, None)
g = greenlet(self.run)
BaseTwistedHub.__init__(self, g)
def switch(self):
if not self.greenlet:
self.greenlet = greenlib.tracked_greenlet()
args = ((self.run,),)
else:
args = ()
if self.greenlet.dead:
self.greenlet = greenlet(self.run)
try:
greenlet.getcurrent().parent = self.greenlet
except ValueError, ex:
pass
return greenlib.switch(self.greenlet, *args)
return self.greenlet.switch()
def run(self, installSignalHandlers=None):
if installSignalHandlers is None:

View File

@@ -1,18 +1,17 @@
from twisted.internet import defer
from twisted.python import failure
from eventlet.support.greenlet import greenlet
from eventlet import greenlib
from eventlet.api import get_hub, spawn
def block_on(deferred):
cur = [greenlet.getcurrent()]
def cb(value):
if cur:
greenlib.switch(cur[0], value)
cur[0].switch(value)
return value
def eb(err):
if cur:
greenlib.switch(cur[0], exc=(err.type, err.value, err.tb))
err.throwExceptionIntoGenerator(cur[0])
return err
deferred.addCallback(cb)
deferred.addErrback(eb)
@@ -41,7 +40,10 @@ def callInGreenThread(func, *args, **kwargs):
if __name__=='__main__':
import sys
try:
num = int(sys.argv[1])
except:
sys.exit('Supply number of test as an argument, 0, 1, 2 or 3')
from twisted.internet import reactor
def test():
print block_on(reactor.resolver.getHostByName('www.google.com'))
@@ -51,7 +53,9 @@ if __name__=='__main__':
elif num==1:
spawn(test)
from eventlet.api import sleep
print 'sleeping..'
sleep(5)
print 'done sleeping..'
elif num==2:
from eventlet.twistedutil import join_reactor
spawn(test)