Files
deb-python-eventlet/tests/convenience_test.py
amajorek 1b07b9d5e5 py3k - more changes.
- sys.exc_clear does not exist in 3.x
- objects are not comparable by default in in 3.x. Timer needs to have __lt__ operator to be used by heapq.
- callable is gone in 3.x.
- tpool communication pipe changed to exchange binary data. It makes current GreenPipe work in 3.x. It is also faster and have less translation layers.
- 3.x socket uses bytes not str class. Used s2b to make tests working on both.
2010-03-04 23:57:50 -05:00

104 lines
3.6 KiB
Python

import eventlet
from eventlet import event
from tests import LimitedTestCase, s2b
class TestServe(LimitedTestCase):
def setUp(self):
super(TestServe, self).setUp()
from eventlet import debug
debug.hub_exceptions(False)
def tearDown(self):
super(TestServe, self).tearDown()
from eventlet import debug
debug.hub_exceptions(True)
def test_exiting_server(self):
# tests that the server closes the client sock on handle() exit
def closer(sock,addr):
pass
l = eventlet.listen(('localhost', 0))
gt = eventlet.spawn(eventlet.serve, l, closer)
client = eventlet.connect(('localhost', l.getsockname()[1]))
client.sendall(s2b('a'))
self.assertFalse(client.recv(100))
gt.kill()
def test_excepting_server(self):
# tests that the server closes the client sock on handle() exception
def crasher(sock,addr):
sock.recv(1024)
0/0
l = eventlet.listen(('localhost', 0))
gt = eventlet.spawn(eventlet.serve, l, crasher)
client = eventlet.connect(('localhost', l.getsockname()[1]))
client.sendall(s2b('a'))
self.assertRaises(ZeroDivisionError, gt.wait)
self.assertFalse(client.recv(100))
def test_excepting_server_already_closed(self):
# same as above but with explicit clsoe before crash
def crasher(sock,addr):
sock.recv(1024)
sock.close()
0/0
l = eventlet.listen(('localhost', 0))
gt = eventlet.spawn(eventlet.serve, l, crasher)
client = eventlet.connect(('localhost', l.getsockname()[1]))
client.sendall(s2b('a'))
self.assertRaises(ZeroDivisionError, gt.wait)
self.assertFalse(client.recv(100))
def test_called_for_each_connection(self):
hits = [0]
def counter(sock, addr):
hits[0]+=1
l = eventlet.listen(('localhost', 0))
gt = eventlet.spawn(eventlet.serve, l, counter)
for i in xrange(100):
client = eventlet.connect(('localhost', l.getsockname()[1]))
self.assertFalse(client.recv(100))
gt.kill()
self.assertEqual(100, hits[0])
def test_blocking(self):
l = eventlet.listen(('localhost', 0))
x = eventlet.with_timeout(0.01,
eventlet.serve, l, lambda c,a: None,
timeout_value="timeout")
self.assertEqual(x, "timeout")
def test_raising_stopserve(self):
def stopit(conn, addr):
raise eventlet.StopServe()
l = eventlet.listen(('localhost', 0))
# connect to trigger a call to stopit
gt = eventlet.spawn(eventlet.connect,
('localhost', l.getsockname()[1]))
eventlet.serve(l, stopit)
gt.wait()
def test_concurrency(self):
evt = event.Event()
def waiter(sock, addr):
sock.sendall(s2b('hi'))
evt.wait()
l = eventlet.listen(('localhost', 0))
gt = eventlet.spawn(eventlet.serve, l, waiter, 5)
def test_client():
c = eventlet.connect(('localhost', l.getsockname()[1]))
# verify the client is connected by getting data
self.assertEquals(s2b('hi'), c.recv(2))
return c
clients = [test_client() for i in xrange(5)]
# very next client should not get anything
x = eventlet.with_timeout(0.01,
test_client,
timeout_value="timed out")
self.assertEquals(x, "timed out")