This commit is contained in:
Ryan Williams
2010-02-24 00:47:35 -05:00
16 changed files with 209 additions and 111 deletions

View File

@@ -1,10 +1,9 @@
import sys
import threading
from twisted.internet.base import DelayedCall as TwistedDelayedCall
from eventlet import api
from eventlet import getcurrent, greenlet
from eventlet.hubs.hub import FdListener, READ, WRITE
class DelayedCall(TwistedDelayedCall):
"fix DelayedCall to behave like eventlet's Timer in some respects"
@@ -17,7 +16,7 @@ class DelayedCall(TwistedDelayedCall):
class LocalDelayedCall(DelayedCall):
def __init__(self, *args, **kwargs):
self.greenlet = api.getcurrent()
self.greenlet = getcurrent()
DelayedCall.__init__(self, *args, **kwargs)
def _get_cancelled(self):
@@ -77,7 +76,7 @@ class socket_rwdescriptor(FdListener):
# (it has no idea it was switched away). So, we restart the mainloop.
# XXX this is not enough, pollreactor prints the traceback for this and epollreactor
# times out. see test__hub.TestCloseSocketWhilePolling
raise api.GreenletExit
raise greenlet.GreenletExit
logstr = "twistedr"

View File

@@ -1,4 +1,5 @@
from eventlet import coros, proc, api
from eventlet.semaphore import Semaphore
import warnings
warnings.warn("The pool module is deprecated. Please use the "
@@ -10,7 +11,7 @@ class Pool(object):
if min_size > max_size:
raise ValueError('min_size cannot be bigger than max_size')
self.max_size = max_size
self.sem = coros.Semaphore(max_size)
self.sem = Semaphore(max_size)
self.procs = proc.RunningProcSet()
if track_events:
self.results = coros.queue()

View File

@@ -1,5 +1,5 @@
from eventlet.api import spawn, getcurrent
from eventlet.hubs import get_hub
from eventlet import spawn, getcurrent
def block_on(deferred):
cur = [getcurrent()]

View File

@@ -6,8 +6,8 @@ from twisted.internet.protocol import Factory, ClientFactory
from twisted.internet import main
from twisted.python import failure
from eventlet import proc
from eventlet.api import getcurrent
from eventlet import greenthread
from eventlet import getcurrent
from eventlet.coros import Queue
from eventlet.event import Event as BaseEvent
@@ -381,7 +381,7 @@ class SimpleSpawnFactory(Factory):
return protocol
def _do_spawn(self, gtransport, protocol):
proc.spawn_greenlet(self._run_handler, gtransport, protocol)
greenthread.spawn(self._run_handler, gtransport, protocol)
def _run_handler(self, gtransport, protocol):
try:
@@ -402,10 +402,13 @@ class SpawnFactory(SimpleSpawnFactory):
SimpleSpawnFactory.__init__(self, handler, gtransport_class, *args, **kwargs)
def _do_spawn(self, gtransport, protocol):
g = proc.spawn(self._run_handler, gtransport, protocol)
g = greenthread.spawn(self._run_handler, gtransport, protocol)
self.greenlets.add(g)
g.link(lambda *_: self.greenlets.remove(g))
def waitall(self):
return proc.waitall(self.greenlets)
results = []
for g in self.greenlets:
results.append(g.wait())
return results

View File

@@ -3,6 +3,9 @@ import sys
import os
import errno
import unittest
import warnings
from eventlet import debug, hubs
# convenience for importers
main = unittest.main
@@ -105,6 +108,15 @@ class LimitedTestCase(unittest.TestCase):
def tearDown(self):
self.timer.cancel()
try:
hub = hubs.get_hub()
num_readers = len(hub.get_readers())
num_writers = len(hub.get_writers())
assert num_readers == num_writers == 0
except AssertionError, e:
print "ERROR: Hub not empty"
print debug.format_hub_timers()
print debug.format_hub_listeners()
def verify_hub_empty():
@@ -112,7 +124,7 @@ def verify_hub_empty():
hub = hubs.get_hub()
num_readers = len(hub.get_readers())
num_writers = len(hub.get_writers())
num_timers = len(hub.get_timers_count())
num_timers = hub.get_timers_count()
assert num_readers == 0 and num_writers == 0, "Readers: %s Writers: %s" % (num_readers, num_writers)
@@ -123,3 +135,12 @@ def find_command(command):
return p
raise IOError(errno.ENOENT, 'Command not found: %r' % command)
def silence_warnings(func):
def wrapper(*args, **kw):
warnings.simplefilter('ignore', DeprecationWarning)
try:
return func(*args, **kw)
finally:
warnings.simplefilter('default', DeprecationWarning)
wrapper.__name__ = func.__name__
return wrapper

View File

@@ -2,11 +2,12 @@ import os
import os.path
import socket
from unittest import TestCase, main
import warnings
warnings.simplefilter('ignore', DeprecationWarning)
from eventlet import api
from eventlet import greenio
from eventlet import util
from eventlet import hubs
warnings.simplefilter('default', DeprecationWarning)
from eventlet import greenio, util, hubs, greenthread, spawn
def check_hub():
# Clear through the descriptor queue
@@ -29,7 +30,7 @@ class TestApi(TestCase):
private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
def test_tcp_listener(self):
socket = api.tcp_listener(('0.0.0.0', 0))
socket = greenio.listen(('0.0.0.0', 0))
assert socket.getsockname()[0] == '0.0.0.0'
socket.close()
@@ -46,10 +47,10 @@ class TestApi(TestCase):
finally:
listenfd.close()
server = api.tcp_listener(('0.0.0.0', 0))
server = greenio.listen(('0.0.0.0', 0))
api.spawn(accept_once, server)
client = api.connect_tcp(('127.0.0.1', server.getsockname()[1]))
client = greenio.connect(('127.0.0.1', server.getsockname()[1]))
fd = client.makefile()
client.close()
assert fd.readline() == 'hello\n'
@@ -75,7 +76,7 @@ class TestApi(TestCase):
self.private_key_file)
api.spawn(accept_once, server)
raw_client = api.connect_tcp(('127.0.0.1', server.getsockname()[1]))
raw_client = greenio.connect(('127.0.0.1', server.getsockname()[1]))
client = util.wrap_ssl(raw_client)
fd = socket._fileobject(client, 'rb', 8192)
@@ -92,16 +93,15 @@ class TestApi(TestCase):
def test_001_trampoline_timeout(self):
from eventlet import coros
server_sock = api.tcp_listener(('127.0.0.1', 0))
server_sock = greenio.listen(('127.0.0.1', 0))
bound_port = server_sock.getsockname()[1]
def server(sock):
client, addr = sock.accept()
api.sleep(0.1)
server_evt = coros.execute(server, server_sock)
server_evt = spawn(server, server_sock)
api.sleep(0)
try:
desc = greenio.GreenSocket(util.tcp_socket())
desc.connect(('127.0.0.1', bound_port))
desc = greenio.connect(('127.0.0.1', bound_port))
api.trampoline(desc, read=True, write=False, timeout=0.001)
except api.TimeoutError:
pass # test passed
@@ -112,7 +112,7 @@ class TestApi(TestCase):
check_hub()
def test_timeout_cancel(self):
server = api.tcp_listener(('0.0.0.0', 0))
server = greenio.listen(('0.0.0.0', 0))
bound_port = server.getsockname()[1]
done = [False]
@@ -122,20 +122,17 @@ class TestApi(TestCase):
conn.close()
def go():
client = util.tcp_socket()
desc = greenio.GreenSocket(client)
desc.connect(('127.0.0.1', bound_port))
desc = greenio.connect(('127.0.0.1', bound_port))
try:
api.trampoline(desc, read=True, timeout=0.1)
except api.TimeoutError:
assert False, "Timed out"
server.close()
client.close()
desc.close()
done[0] = True
api.call_after(0, go)
greenthread.spawn_after_local(0, go)
server_coro = api.spawn(client_closer, server)
while not done[0]:

View File

@@ -1,5 +1,5 @@
from unittest import main, TestCase
from tests import LimitedTestCase
from tests import LimitedTestCase, silence_warnings
import eventlet
from eventlet import coros
from eventlet import event
@@ -13,6 +13,8 @@ class IncrActor(coros.Actor):
class TestActor(LimitedTestCase):
mode = 'static'
@silence_warnings
def setUp(self):
super(TestActor, self).setUp()
self.actor = IncrActor()
@@ -93,6 +95,7 @@ class TestActor(LimitedTestCase):
evt.wait()
self.assertEqual(['should_appear'], msgs)
@silence_warnings
def test_multiple(self):
self.actor = IncrActor(concurrency=2)
total = [0]

View File

@@ -1,8 +1,7 @@
from unittest import TestCase, main
import eventlet
from eventlet import api
from eventlet import coros
from eventlet import Queue
from eventlet import pools
class IntPool(pools.Pool):
@@ -40,31 +39,31 @@ class TestIntPool(TestCase):
self.assertEquals(self.pool.free(), 4)
def test_exhaustion(self):
waiter = coros.queue(0)
waiter = Queue(0)
def consumer():
gotten = None
try:
gotten = self.pool.get()
finally:
waiter.send(gotten)
waiter.put(gotten)
api.spawn(consumer)
eventlet.spawn(consumer)
one, two, three, four = (
self.pool.get(), self.pool.get(), self.pool.get(), self.pool.get())
self.assertEquals(self.pool.free(), 0)
# Let consumer run; nothing will be in the pool, so he will wait
api.sleep(0)
eventlet.sleep(0)
# Wake consumer
self.pool.put(one)
# wait for the consumer
self.assertEquals(waiter.wait(), one)
self.assertEquals(waiter.get(), one)
def test_blocks_on_pool(self):
waiter = coros.queue(0)
waiter = Queue(0)
def greedy():
self.pool.get()
self.pool.get()
@@ -74,24 +73,24 @@ class TestIntPool(TestCase):
self.assertEquals(self.pool.waiting(), 0)
# The call to the next get will unschedule this routine.
self.pool.get()
# So this send should never be called.
waiter.send('Failed!')
# So this put should never be called.
waiter.put('Failed!')
killable = api.spawn(greedy)
killable = eventlet.spawn(greedy)
# no one should be waiting yet.
self.assertEquals(self.pool.waiting(), 0)
## Wait for greedy
api.sleep(0)
eventlet.sleep(0)
## Greedy should be blocking on the last get
self.assertEquals(self.pool.waiting(), 1)
## Send will never be called, so balance should be 0.
self.assertFalse(waiter.ready())
self.assertFalse(not waiter.full())
api.kill(killable)
eventlet.kill(killable)
def test_ordering(self):
# normal case is that items come back out in the
@@ -107,17 +106,17 @@ class TestIntPool(TestCase):
try:
size = 2
self.pool = IntPool(min_size=0, max_size=size)
queue = coros.queue()
queue = Queue()
results = []
def just_put(pool_item, index):
self.pool.put(pool_item)
queue.send(index)
queue.put(index)
for index in xrange(size + 1):
pool_item = self.pool.get()
api.spawn(just_put, pool_item, index)
eventlet.spawn(just_put, pool_item, index)
for _ in range(size+1):
x = queue.wait()
x = queue.get()
results.append(x)
self.assertEqual(sorted(results), range(size + 1))
finally:

View File

@@ -1,7 +1,10 @@
import sys
import warnings
from tests import LimitedTestCase, main, skip_on_windows
warnings.simplefilter('ignore', DeprecationWarning)
from eventlet import processes, api
warnings.simplefilter('default', DeprecationWarning)
class TestEchoPool(LimitedTestCase):
def setUp(self):

View File

@@ -1,5 +1,8 @@
from eventlet import api, saranwrap
from eventlet import greenpool
import warnings
warnings.simplefilter('ignore', DeprecationWarning)
from eventlet import saranwrap
warnings.simplefilter('default', DeprecationWarning)
from eventlet import greenpool, sleep
import os
import eventlet
@@ -350,7 +353,7 @@ sys_path = sys.path""")
# sleep for a bit to make sure out coroutine ran by the time
# we check the assert below
api.sleep(0.1)
sleep(0.1)
self.assert_(
'random' in obj_proxy.get_dict(),
@@ -363,7 +366,7 @@ sys_path = sys.path""")
pid = saranwrap.getpid(prox)
self.assertEqual(os.kill(pid, 0), None) # assert that the process is running
del prox # removing all references to the proxy should kill the child process
api.sleep(0.1) # need to let the signal handler run
sleep(0.1) # need to let the signal handler run
self.assertRaises(OSError, os.kill, pid, 0) # raises OSError if pid doesn't exist
@skip_on_windows

View File

@@ -1,21 +1,25 @@
from tests import LimitedTestCase
from tests import LimitedTestCase, silence_warnings
from unittest import main
import eventlet
from eventlet import api, coros, proc
from eventlet import coros, spawn, sleep
from eventlet.event import Event
class TestQueue(LimitedTestCase):
@silence_warnings
def test_send_first(self):
q = coros.queue()
q.send('hi')
self.assertEquals(q.wait(), 'hi')
@silence_warnings
def test_send_exception_first(self):
q = coros.queue()
q.send(exc=RuntimeError())
self.assertRaises(RuntimeError, q.wait)
@silence_warnings
def test_send_last(self):
q = coros.queue()
def waiter(q):
@@ -23,11 +27,12 @@ class TestQueue(LimitedTestCase):
self.assertEquals(q.wait(), 'hi2')
timer.cancel()
api.spawn(waiter, q)
api.sleep(0)
api.sleep(0)
spawn(waiter, q)
sleep(0)
sleep(0)
q.send('hi2')
@silence_warnings
def test_max_size(self):
q = coros.queue(2)
results = []
@@ -40,15 +45,16 @@ class TestQueue(LimitedTestCase):
q.send('c')
results.append('c')
api.spawn(putter, q)
api.sleep(0)
spawn(putter, q)
sleep(0)
self.assertEquals(results, ['a', 'b'])
self.assertEquals(q.wait(), 'a')
api.sleep(0)
sleep(0)
self.assertEquals(results, ['a', 'b', 'c'])
self.assertEquals(q.wait(), 'b')
self.assertEquals(q.wait(), 'c')
@silence_warnings
def test_zero_max_size(self):
q = coros.queue(0)
def sender(evt, q):
@@ -59,16 +65,17 @@ class TestQueue(LimitedTestCase):
x = q.wait()
evt.send(x)
e1 = coros.Event()
e2 = coros.Event()
e1 = Event()
e2 = Event()
api.spawn(sender, e1, q)
api.sleep(0)
spawn(sender, e1, q)
sleep(0)
self.assert_(not e1.ready())
api.spawn(receiver, e2, q)
spawn(receiver, e2, q)
self.assertEquals(e2.wait(),'hi')
self.assertEquals(e1.wait(),'done')
@silence_warnings
def test_multiple_waiters(self):
# tests that multiple waiters get their results back
q = coros.queue()
@@ -88,6 +95,7 @@ class TestQueue(LimitedTestCase):
results.add(gt.wait())
self.assertEquals(results, set(sendings))
@silence_warnings
def test_waiters_that_cancel(self):
q = coros.queue()
@@ -100,22 +108,24 @@ class TestQueue(LimitedTestCase):
evt.send('timed out')
evt = coros.Event()
api.spawn(do_receive, q, evt)
evt = Event()
spawn(do_receive, q, evt)
self.assertEquals(evt.wait(), 'timed out')
q.send('hi')
self.assertEquals(q.wait(), 'hi')
@silence_warnings
def test_senders_that_die(self):
q = coros.queue()
def do_send(q):
q.send('sent')
api.spawn(do_send, q)
spawn(do_send, q)
self.assertEquals(q.wait(), 'sent')
@silence_warnings
def test_two_waiters_one_dies(self):
def waiter(q, evt):
evt.send(q.wait())
@@ -128,15 +138,16 @@ class TestQueue(LimitedTestCase):
evt.send('timed out')
q = coros.queue()
dying_evt = coros.Event()
waiting_evt = coros.Event()
api.spawn(do_receive, q, dying_evt)
api.spawn(waiter, q, waiting_evt)
api.sleep(0)
dying_evt = Event()
waiting_evt = Event()
spawn(do_receive, q, dying_evt)
spawn(waiter, q, waiting_evt)
sleep(0)
q.send('hi')
self.assertEquals(dying_evt.wait(), 'timed out')
self.assertEquals(waiting_evt.wait(), 'hi')
@silence_warnings
def test_two_bogus_waiters(self):
def do_receive(q, evt):
eventlet.Timeout(0, RuntimeError())
@@ -147,28 +158,29 @@ class TestQueue(LimitedTestCase):
evt.send('timed out')
q = coros.queue()
e1 = coros.Event()
e2 = coros.Event()
api.spawn(do_receive, q, e1)
api.spawn(do_receive, q, e2)
api.sleep(0)
e1 = Event()
e2 = Event()
spawn(do_receive, q, e1)
spawn(do_receive, q, e2)
sleep(0)
q.send('sent')
self.assertEquals(e1.wait(), 'timed out')
self.assertEquals(e2.wait(), 'timed out')
self.assertEquals(q.wait(), 'sent')
@silence_warnings
def test_waiting(self):
def do_wait(q, evt):
result = q.wait()
evt.send(result)
q = coros.queue()
e1 = coros.Event()
api.spawn(do_wait, q, e1)
api.sleep(0)
e1 = Event()
spawn(do_wait, q, e1)
sleep(0)
self.assertEquals(1, q.waiting())
q.send('hi')
api.sleep(0)
sleep(0)
self.assertEquals(0, q.waiting())
self.assertEquals('hi', e1.wait())
self.assertEquals(0, q.waiting())
@@ -176,8 +188,9 @@ class TestQueue(LimitedTestCase):
class TestChannel(LimitedTestCase):
@silence_warnings
def test_send(self):
api.sleep(0.1)
sleep(0.1)
channel = coros.queue(0)
events = []
@@ -186,7 +199,7 @@ class TestChannel(LimitedTestCase):
events.append(channel.wait())
events.append(channel.wait())
proc.spawn(another_greenlet)
spawn(another_greenlet)
events.append('sending')
channel.send('hello')
@@ -197,8 +210,9 @@ class TestChannel(LimitedTestCase):
self.assertEqual(['sending', 'hello', 'sent hello', 'world', 'sent world'], events)
@silence_warnings
def test_wait(self):
api.sleep(0.1)
sleep(0.1)
channel = coros.queue(0)
events = []
@@ -209,27 +223,28 @@ class TestChannel(LimitedTestCase):
channel.send('world')
events.append('sent world')
proc.spawn(another_greenlet)
spawn(another_greenlet)
events.append('waiting')
events.append(channel.wait())
events.append(channel.wait())
self.assertEqual(['waiting', 'sending hello', 'hello', 'sending world', 'world'], events)
api.sleep(0)
sleep(0)
self.assertEqual(['waiting', 'sending hello', 'hello', 'sending world', 'world', 'sent world'], events)
@silence_warnings
def test_waiters(self):
c = coros.Channel()
w1 = eventlet.spawn(c.wait)
w2 = eventlet.spawn(c.wait)
w3 = eventlet.spawn(c.wait)
api.sleep(0)
sleep(0)
self.assertEquals(c.waiting(), 3)
s1 = eventlet.spawn(c.send, 1)
s2 = eventlet.spawn(c.send, 2)
s3 = eventlet.spawn(c.send, 3)
api.sleep(0) # this gets all the sends into a waiting state
sleep(0) # this gets all the sends into a waiting state
self.assertEquals(c.waiting(), 0)
s1.wait()

View File

@@ -4,12 +4,16 @@ If either operation blocked the whole script would block and timeout.
"""
import unittest
from eventlet.green import urllib2, BaseHTTPServer
from eventlet.api import spawn, kill
from eventlet import spawn, kill
class QuietHandler(BaseHTTPServer.BaseHTTPRequestHandler):
protocol_version = "HTTP/1.0"
def log_message(self, *args, **kw):
pass
def start_http_server():
server_address = ('localhost', 0)
BaseHTTPServer.BaseHTTPRequestHandler.protocol_version = "HTTP/1.0"
httpd = BaseHTTPServer.HTTPServer(server_address, BaseHTTPServer.BaseHTTPRequestHandler)
httpd = BaseHTTPServer.HTTPServer(server_address, QuietHandler)
sa = httpd.socket.getsockname()
#print "Serving HTTP on", sa[0], "port", sa[1], "..."
httpd.request_count = 0

View File

@@ -1,5 +1,8 @@
import eventlet
import warnings
warnings.simplefilter('ignore', DeprecationWarning)
from eventlet import pool, coros, api, hubs, timeout
warnings.simplefilter('default', DeprecationWarning)
from eventlet import event as _event
from tests import LimitedTestCase
from unittest import main

View File

@@ -1,9 +1,13 @@
import sys
import unittest
from eventlet.api import sleep, with_timeout
from eventlet import api, proc, coros
import warnings
warnings.simplefilter('ignore', DeprecationWarning)
from eventlet import proc
warnings.simplefilter('default', DeprecationWarning)
from eventlet import coros
from eventlet import event as _event
from tests import LimitedTestCase, skipped
from eventlet import Timeout, sleep, getcurrent, with_timeout
from tests import LimitedTestCase, skipped, silence_warnings
DELAY = 0.01
@@ -12,37 +16,39 @@ class ExpectedError(Exception):
class TestLink_Signal(LimitedTestCase):
@silence_warnings
def test_send(self):
s = proc.Source()
q1, q2, q3 = coros.queue(), coros.queue(), coros.queue()
s.link_value(q1)
self.assertRaises(api.TimeoutError, s.wait, 0)
self.assertRaises(Timeout, s.wait, 0)
assert s.wait(0, None) is None
assert s.wait(0.001, None) is None
self.assertRaises(api.TimeoutError, s.wait, 0.001)
self.assertRaises(Timeout, s.wait, 0.001)
s.send(1)
assert not q1.ready()
assert s.wait()==1
api.sleep(0)
sleep(0)
assert q1.ready()
s.link_exception(q2)
s.link(q3)
assert not q2.ready()
api.sleep(0)
sleep(0)
assert q3.ready()
assert s.wait()==1
@silence_warnings
def test_send_exception(self):
s = proc.Source()
q1, q2, q3 = coros.queue(), coros.queue(), coros.queue()
s.link_exception(q1)
s.send_exception(OSError('hello'))
api.sleep(0)
sleep(0)
assert q1.ready()
s.link_value(q2)
s.link(q3)
assert not q2.ready()
api.sleep(0)
sleep(0)
assert q3.ready()
self.assertRaises(OSError, q1.wait)
self.assertRaises(OSError, q3.wait)
@@ -53,10 +59,10 @@ class TestProc(LimitedTestCase):
def test_proc(self):
p = proc.spawn(lambda : 100)
receiver = proc.spawn(api.sleep, 1)
receiver = proc.spawn(sleep, 1)
p.link(receiver)
self.assertRaises(proc.LinkedCompleted, receiver.wait)
receiver2 = proc.spawn(api.sleep, 1)
receiver2 = proc.spawn(sleep, 1)
p.link(receiver2)
self.assertRaises(proc.LinkedCompleted, receiver2.wait)
@@ -210,8 +216,9 @@ class TestRaise_link(TestCase):
self.check_timed_out(*xxxxx)
@silence_warnings
def test_raise(self):
p = self.p = proc.spawn(lambda : api.getcurrent().throw(ExpectedError('test_raise')))
p = self.p = proc.spawn(lambda : getcurrent().throw(ExpectedError('test_raise')))
self._test_raise(p, True, proc.LinkedFailed)
# repeating the same with dead process
for _ in xrange(3):
@@ -242,6 +249,7 @@ class TestRaise_link(TestCase):
self.check_timed_out(*xxxxx)
@silence_warnings
def test_kill(self):
p = self.p = proc.spawn(sleep, DELAY)
self._test_kill(p, True, proc.LinkedKilled)
@@ -277,7 +285,7 @@ class TestStuff(LimitedTestCase):
return 1
x = proc.spawn(x)
z = proc.spawn(lambda : 3)
y = proc.spawn(lambda : api.getcurrent().throw(ExpectedError('test_wait_error')))
y = proc.spawn(lambda : getcurrent().throw(ExpectedError('test_wait_error')))
y.link(x)
x.link(y)
y.link(z)
@@ -293,12 +301,12 @@ class TestStuff(LimitedTestCase):
sleep(0.1)
raise ExpectedError('first')
a = proc.spawn(first)
b = proc.spawn(lambda : api.getcurrent().throw(ExpectedError('second')))
b = proc.spawn(lambda : getcurrent().throw(ExpectedError('second')))
try:
proc.waitall([a, b])
except ExpectedError, ex:
assert 'second' in str(ex), repr(str(ex))
api.sleep(0.2) # sleep to ensure that the other timer is raised
sleep(0.2) # sleep to ensure that the other timer is raised
def test_multiple_listeners_error(self):
# if there was an error while calling a callback
@@ -321,7 +329,7 @@ class TestStuff(LimitedTestCase):
sleep(DELAY*10)
assert results in [[10, 20], [20, 10]], results
p = proc.spawn(lambda : api.getcurrent().throw(ExpectedError('test_multiple_listeners_error')))
p = proc.spawn(lambda : getcurrent().throw(ExpectedError('test_multiple_listeners_error')))
results = []
p.link(listener1)
p.link(listener2)

View File

@@ -17,7 +17,7 @@ except ImportError:
class reactor(object):
pass
from eventlet.api import spawn, sleep, with_timeout, call_after
from eventlet import spawn, sleep, with_timeout, spawn_after
from eventlet.coros import Event
try:
@@ -144,7 +144,7 @@ class TestGreenTransport(TestUnbufferedTransport):
@requires_twisted
def test_pauseresume_producing(self):
self.conn.pauseProducing()
call_after(DELAY*5, self.conn.resumeProducing)
spawn_after(DELAY*5, self.conn.resumeProducing)
self.conn.write('hi\r\n')
result = with_timeout(DELAY*10, self.conn.read, timeout_value='timed out')
self.assertEqual('you said hi. BYE', result)

39
tests/thread_test.py Normal file
View File

@@ -0,0 +1,39 @@
from eventlet.green import thread
from eventlet import greenthread
from eventlet import event
import eventlet
from tests import LimitedTestCase
class Locals(LimitedTestCase):
def passthru(self, *args, **kw):
self.results.append((args, kw))
return args, kw
def setUp(self):
self.results = []
super(Locals, self).setUp()
def tearDown(self):
self.results = []
super(Locals, self).tearDown()
def test_simple(self):
tls = thread._local()
g_ids = []
evt = event.Event()
def setter(tls, v):
g_id = id(greenthread.getcurrent())
g_ids.append(g_id)
tls.value = v
evt.wait()
thread.start_new_thread(setter, args=(tls, 1))
thread.start_new_thread(setter, args=(tls, 2))
eventlet.sleep()
objs = object.__getattribute__(tls, "__objs")
self.failUnlessEqual(sorted(g_ids), sorted(objs.keys()))
self.failUnlessEqual(objs[g_ids[0]]['value'], 1)
self.failUnlessEqual(objs[g_ids[1]]['value'], 2)
self.failUnlessRaises(AttributeError, lambda: tls.value)
evt.send("done")
eventlet.sleep()