This includes changes to WSGI, websocket, bytes/str/unicode handling, SSL, backdoor, greenio and tests. Some comments and conditionals (PY2/PY3) were added for clarity GH issues: Closes #106 Closes #111 Closes #118 Closes #141 Incidentally should also close #135 (reopen if didn't) cc #6
		
			
				
	
	
		
			141 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			141 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import os
 | 
						|
 | 
						|
import eventlet
 | 
						|
from eventlet import debug, event
 | 
						|
from eventlet.green import socket
 | 
						|
from eventlet.support import six
 | 
						|
from tests import LimitedTestCase, skip_if_no_ssl
 | 
						|
 | 
						|
 | 
						|
certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
 | 
						|
private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
 | 
						|
 | 
						|
 | 
						|
class TestServe(LimitedTestCase):
 | 
						|
    def setUp(self):
 | 
						|
        super(TestServe, self).setUp()
 | 
						|
        debug.hub_exceptions(False)
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        super(TestServe, self).tearDown()
 | 
						|
        debug.hub_exceptions(True)
 | 
						|
 | 
						|
    def test_exiting_server(self):
 | 
						|
        # tests that the server closes the client sock on handle() exit
 | 
						|
        def closer(sock, addr):
 | 
						|
            pass
 | 
						|
 | 
						|
        l = eventlet.listen(('localhost', 0))
 | 
						|
        gt = eventlet.spawn(eventlet.serve, l, closer)
 | 
						|
        client = eventlet.connect(('localhost', l.getsockname()[1]))
 | 
						|
        client.sendall(b'a')
 | 
						|
        self.assertFalse(client.recv(100))
 | 
						|
        gt.kill()
 | 
						|
 | 
						|
    def test_excepting_server(self):
 | 
						|
        # tests that the server closes the client sock on handle() exception
 | 
						|
        def crasher(sock, addr):
 | 
						|
            sock.recv(1024)
 | 
						|
            0 // 0
 | 
						|
 | 
						|
        l = eventlet.listen(('localhost', 0))
 | 
						|
        gt = eventlet.spawn(eventlet.serve, l, crasher)
 | 
						|
        client = eventlet.connect(('localhost', l.getsockname()[1]))
 | 
						|
        client.sendall(b'a')
 | 
						|
        self.assertRaises(ZeroDivisionError, gt.wait)
 | 
						|
        self.assertFalse(client.recv(100))
 | 
						|
 | 
						|
    def test_excepting_server_already_closed(self):
 | 
						|
        # same as above but with explicit clsoe before crash
 | 
						|
        def crasher(sock, addr):
 | 
						|
            sock.recv(1024)
 | 
						|
            sock.close()
 | 
						|
            0 // 0
 | 
						|
 | 
						|
        l = eventlet.listen(('localhost', 0))
 | 
						|
        gt = eventlet.spawn(eventlet.serve, l, crasher)
 | 
						|
        client = eventlet.connect(('localhost', l.getsockname()[1]))
 | 
						|
        client.sendall(b'a')
 | 
						|
        self.assertRaises(ZeroDivisionError, gt.wait)
 | 
						|
        self.assertFalse(client.recv(100))
 | 
						|
 | 
						|
    def test_called_for_each_connection(self):
 | 
						|
        hits = [0]
 | 
						|
 | 
						|
        def counter(sock, addr):
 | 
						|
            hits[0] += 1
 | 
						|
        l = eventlet.listen(('localhost', 0))
 | 
						|
        gt = eventlet.spawn(eventlet.serve, l, counter)
 | 
						|
        for i in six.moves.range(100):
 | 
						|
            client = eventlet.connect(('localhost', l.getsockname()[1]))
 | 
						|
            self.assertFalse(client.recv(100))
 | 
						|
        gt.kill()
 | 
						|
        self.assertEqual(100, hits[0])
 | 
						|
 | 
						|
    def test_blocking(self):
 | 
						|
        l = eventlet.listen(('localhost', 0))
 | 
						|
        x = eventlet.with_timeout(
 | 
						|
            0.01,
 | 
						|
            eventlet.serve, l, lambda c, a: None,
 | 
						|
            timeout_value="timeout")
 | 
						|
        self.assertEqual(x, "timeout")
 | 
						|
 | 
						|
    def test_raising_stopserve(self):
 | 
						|
        def stopit(conn, addr):
 | 
						|
            raise eventlet.StopServe()
 | 
						|
        l = eventlet.listen(('localhost', 0))
 | 
						|
        # connect to trigger a call to stopit
 | 
						|
        gt = eventlet.spawn(eventlet.connect, ('localhost', l.getsockname()[1]))
 | 
						|
        eventlet.serve(l, stopit)
 | 
						|
        gt.wait()
 | 
						|
 | 
						|
    def test_concurrency(self):
 | 
						|
        evt = event.Event()
 | 
						|
 | 
						|
        def waiter(sock, addr):
 | 
						|
            sock.sendall(b'hi')
 | 
						|
            evt.wait()
 | 
						|
        l = eventlet.listen(('localhost', 0))
 | 
						|
        eventlet.spawn(eventlet.serve, l, waiter, 5)
 | 
						|
 | 
						|
        def test_client():
 | 
						|
            c = eventlet.connect(('localhost', l.getsockname()[1]))
 | 
						|
            # verify the client is connected by getting data
 | 
						|
            self.assertEqual(b'hi', c.recv(2))
 | 
						|
            return c
 | 
						|
        [test_client() for i in range(5)]
 | 
						|
        # very next client should not get anything
 | 
						|
        x = eventlet.with_timeout(
 | 
						|
            0.01,
 | 
						|
            test_client,
 | 
						|
            timeout_value="timed out")
 | 
						|
        self.assertEqual(x, "timed out")
 | 
						|
 | 
						|
    @skip_if_no_ssl
 | 
						|
    def test_wrap_ssl(self):
 | 
						|
        server = eventlet.wrap_ssl(
 | 
						|
            eventlet.listen(('localhost', 0)),
 | 
						|
            certfile=certificate_file, keyfile=private_key_file,
 | 
						|
            server_side=True)
 | 
						|
        port = server.getsockname()[1]
 | 
						|
 | 
						|
        def handle(sock, addr):
 | 
						|
            sock.sendall(sock.recv(1024))
 | 
						|
            raise eventlet.StopServe()
 | 
						|
 | 
						|
        eventlet.spawn(eventlet.serve, server, handle)
 | 
						|
        client = eventlet.wrap_ssl(eventlet.connect(('localhost', port)))
 | 
						|
        client.sendall(b"echo")
 | 
						|
        self.assertEqual(b"echo", client.recv(1024))
 | 
						|
 | 
						|
    def test_socket_reuse(self):
 | 
						|
        lsock1 = eventlet.listen(('localhost', 0))
 | 
						|
        port = lsock1.getsockname()[1]
 | 
						|
 | 
						|
        def same_socket():
 | 
						|
            return eventlet.listen(('localhost', port))
 | 
						|
 | 
						|
        self.assertRaises(socket.error, same_socket)
 | 
						|
        lsock1.close()
 | 
						|
        assert same_socket()
 |