diff --git a/eventlet/greenio.py b/eventlet/greenio.py index 6829ab8..647fcbc 100644 --- a/eventlet/greenio.py +++ b/eventlet/greenio.py @@ -561,14 +561,25 @@ def wrap_ssl(sock, keyfile=None, certfile=None, server_side=False, pass +class StopServe(Exception): pass + def serve(sock, handle, concurrency=1000): - """Runs a server on the supplied socket. Calls the function - *handle* in a separate greenthread for every incoming request. - This function blocks the calling greenthread; it won't return until + """Runs a server on the supplied socket. Calls the function + *handle* in a separate greenthread for every incoming request with + two arguments: the client socket object, and the client address:: + + def myhandle(client_sock, client_addr): + print "client connected", client_addr + + eventlet.serve(eventlet.listen(('127.0.0.1', 9999)), myhandle) + + Returning from *handle* closes the client socket. + + :func:`serve` blocks the calling greenthread; it won't return until the server completes. If you desire an immediate return, spawn a new greenthread for :func:`serve`. - - The *handle* function must raise an EndServerException to + + The *handle* function must raise a StopServe exception to gracefully terminate the server -- that's the only way to get the server() function to return. Any other uncaught exceptions raised in *handle* are raised as exceptions from :func:`serve`, so be @@ -580,4 +591,24 @@ def serve(sock, handle, concurrency=1000): the server hits the concurrency limit, it stops accepting new connections until the existing ones complete. """ - pass + from eventlet import greenpool + from eventlet import greenthread + pool = greenpool.GreenPool(concurrency) + server_thread = greenthread.getcurrent() + + def stop_checker(t, server_thread, conn): + try: + t.wait() + except greenthread.greenlet.GreenletExit: + pass + except Exception: + conn.close() + server_thread.throw(*sys.exc_info()) + + while True: + try: + conn, addr = sock.accept() + pool.spawn(handle, conn, addr).link(stop_checker, server_thread, conn) + conn, addr = None, None + except StopServe: + return diff --git a/tests/greenio_test.py b/tests/greenio_test.py index cee6f27..7afda9c 100644 --- a/tests/greenio_test.py +++ b/tests/greenio_test.py @@ -524,5 +524,58 @@ class TestGreenIoLong(LimitedTestCase): self.assert_(len(results2) > 0) +class TestServe(LimitedTestCase): + def setUp(self): + super(TestServe, self).setUp() + from eventlet import debug + debug.hub_exceptions(False) + + def tearDown(self): + super(TestServe, self).tearDown() + from eventlet import debug + debug.hub_exceptions(True) + + def test_exiting_server(self): + # tests that the server closes the client sock on handle() exit + def closer(sock,addr): + pass + + l = eventlet.listen(('localhost', 0)) + gt = eventlet.spawn(greenio.serve, l, closer) + client = eventlet.connect(('localhost', l.getsockname()[1])) + client.sendall('a') + self.assertEqual('', client.recv(100)) + gt.kill() + + + def test_excepting_server(self): + # tests that the server closes the client sock on handle() exception + def crasher(sock,addr): + x = sock.recv(1024) + 0/0 + + l = eventlet.listen(('localhost', 0)) + gt = eventlet.spawn(greenio.serve, l, crasher) + client = eventlet.connect(('localhost', l.getsockname()[1])) + client.sendall('a') + self.assertRaises(ZeroDivisionError, gt.wait) + self.assertEqual('', client.recv(100)) + + def test_excepting_server_already_closed(self): + # tests that the server closes the client sock on handle() exception + def crasher(sock,addr): + x = sock.recv(1024) + sock.close() + 0/0 + + l = eventlet.listen(('localhost', 0)) + gt = eventlet.spawn(greenio.serve, l, crasher) + client = eventlet.connect(('localhost', l.getsockname()[1])) + client.sendall('a') + self.assertRaises(ZeroDivisionError, gt.wait) + self.assertEqual('', client.recv(100)) + + + if __name__ == '__main__': main()