Moved a test.

This commit is contained in:
Ryan Williams
2010-01-25 18:10:13 -08:00
parent c856c95729
commit af59d5cc4d
2 changed files with 40 additions and 37 deletions

View File

@@ -154,42 +154,6 @@ class TestApi(TestCase):
def test_naming_missing_class(self):
self.assertRaises(
ImportError, api.named, 'this_name_should_hopefully_not_exist.Foo')
def test_timeout_and_final_write(self):
# This test verifies that a write on a socket that we've
# stopped listening for doesn't result in an incorrect switch
server = api.tcp_listener(('127.0.0.1', 0))
bound_port = server.getsockname()[1]
def sender(evt):
s2, addr = server.accept()
wrap_wfile = s2.makefile()
api.sleep(0.02)
wrap_wfile.write('hi')
s2.close()
evt.send('sent via event')
from eventlet import coros
evt = coros.Event()
api.spawn(sender, evt)
api.sleep(0) # lets the socket enter accept mode, which
# is necessary for connect to succeed on windows
try:
# try and get some data off of this pipe
# but bail before any is sent
api.exc_after(0.01, api.TimeoutError)
client = api.connect_tcp(('127.0.0.1', bound_port))
wrap_rfile = client.makefile()
_c = wrap_rfile.read(1)
self.fail()
except api.TimeoutError:
pass
result = evt.wait()
self.assertEquals(result, 'sent via event')
server.close()
client.close()
def test_killing_dormant(self):
@@ -205,7 +169,6 @@ class TestApi(TestCase):
pass
# when switching to hub, hub makes itself the parent of this greenlet,
# thus after the function's done, the control will go to the parent
# QQQ why the first sleep is not enough?
api.sleep(0)
state.append('finished')
g = api.spawn(test)

View File

@@ -219,7 +219,47 @@ class TestGreenIo(LimitedTestCase):
sock.bind(('127.0.0.1', 0))
sock.listen(50)
ssl_sock = ssl.wrap_socket(sock)
def test_timeout_and_final_write(self):
# This test verifies that a write on a socket that we've
# stopped listening for doesn't result in an incorrect switch
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 0))
server.listen(50)
bound_port = server.getsockname()[1]
def sender(evt):
s2, addr = server.accept()
wrap_wfile = s2.makefile()
eventlet.sleep(0.02)
wrap_wfile.write('hi')
s2.close()
evt.send('sent via event')
from eventlet import event
evt = event.Event()
eventlet.spawn(sender, evt)
eventlet.sleep(0) # lets the socket enter accept mode, which
# is necessary for connect to succeed on windows
try:
# try and get some data off of this pipe
# but bail before any is sent
eventlet.exc_after(0.01, eventlet.TimeoutError)
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', bound_port))
wrap_rfile = client.makefile()
_c = wrap_rfile.read(1)
self.fail()
except eventlet.TimeoutError:
pass
result = evt.wait()
self.assertEquals(result, 'sent via event')
server.close()
client.close()
class TestGreenIoLong(LimitedTestCase):
TEST_TIMEOUT=10 # the test here might take a while depending on the OS