Files
deb-python-eventlet/tests/api_test.py
Jakub Stasiak 6afd8bdee2 Python 3 compatibility fixes
Closes GH-102
Closes GH-103
Closes GH-104
2014-07-18 02:12:53 +04:00

199 lines
5.7 KiB
Python

import os
import socket
from unittest import TestCase, main
import warnings
import eventlet
from eventlet import greenio, util, hubs, greenthread, spawn
from tests import skip_if_no_ssl
warnings.simplefilter('ignore', DeprecationWarning)
from eventlet import api
warnings.simplefilter('default', DeprecationWarning)
def check_hub():
# Clear through the descriptor queue
api.sleep(0)
api.sleep(0)
hub = hubs.get_hub()
for nm in 'get_readers', 'get_writers':
dct = getattr(hub, nm)()
assert not dct, "hub.%s not empty: %s" % (nm, dct)
# Stop the runloop (unless it's twistedhub which does not support that)
if not getattr(hub, 'uses_twisted_reactor', None):
hub.abort(True)
assert not hub.running
class TestApi(TestCase):
certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
def test_tcp_listener(self):
socket = eventlet.listen(('0.0.0.0', 0))
assert socket.getsockname()[0] == '0.0.0.0'
socket.close()
check_hub()
def test_connect_tcp(self):
def accept_once(listenfd):
try:
conn, addr = listenfd.accept()
fd = conn.makefile(mode='wb')
conn.close()
fd.write(b'hello\n')
fd.close()
finally:
listenfd.close()
server = eventlet.listen(('0.0.0.0', 0))
api.spawn(accept_once, server)
client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
fd = client.makefile('rb')
client.close()
assert fd.readline() == b'hello\n'
assert fd.read() == b''
fd.close()
check_hub()
@skip_if_no_ssl
def test_connect_ssl(self):
def accept_once(listenfd):
try:
conn, addr = listenfd.accept()
conn.write(b'hello\r\n')
greenio.shutdown_safe(conn)
conn.close()
finally:
greenio.shutdown_safe(listenfd)
listenfd.close()
server = api.ssl_listener(('0.0.0.0', 0),
self.certificate_file,
self.private_key_file)
api.spawn(accept_once, server)
raw_client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
client = util.wrap_ssl(raw_client)
fd = socket._fileobject(client, 'rb', 8192)
assert fd.readline() == b'hello\r\n'
try:
self.assertEqual(b'', fd.read(10))
except greenio.SSL.ZeroReturnError:
# if it's a GreenSSL object it'll do this
pass
greenio.shutdown_safe(client)
client.close()
check_hub()
def test_001_trampoline_timeout(self):
server_sock = eventlet.listen(('127.0.0.1', 0))
bound_port = server_sock.getsockname()[1]
def server(sock):
client, addr = sock.accept()
api.sleep(0.1)
server_evt = spawn(server, server_sock)
api.sleep(0)
try:
desc = eventlet.connect(('127.0.0.1', bound_port))
api.trampoline(desc, read=True, write=False, timeout=0.001)
except api.TimeoutError:
pass # test passed
else:
assert False, "Didn't timeout"
server_evt.wait()
check_hub()
def test_timeout_cancel(self):
server = eventlet.listen(('0.0.0.0', 0))
bound_port = server.getsockname()[1]
done = [False]
def client_closer(sock):
while True:
(conn, addr) = sock.accept()
conn.close()
def go():
desc = eventlet.connect(('127.0.0.1', bound_port))
try:
api.trampoline(desc, read=True, timeout=0.1)
except api.TimeoutError:
assert False, "Timed out"
server.close()
desc.close()
done[0] = True
greenthread.spawn_after_local(0, go)
server_coro = api.spawn(client_closer, server)
while not done[0]:
api.sleep(0)
api.kill(server_coro)
check_hub()
def test_named(self):
named_foo = api.named('tests.api_test.Foo')
self.assertEqual(named_foo.__name__, "Foo")
def test_naming_missing_class(self):
self.assertRaises(
ImportError, api.named, 'this_name_should_hopefully_not_exist.Foo')
def test_killing_dormant(self):
DELAY = 0.1
state = []
def test():
try:
state.append('start')
api.sleep(DELAY)
except:
state.append('except')
# catching GreenletExit
pass
# when switching to hub, hub makes itself the parent of this greenlet,
# thus after the function's done, the control will go to the parent
api.sleep(0)
state.append('finished')
g = api.spawn(test)
api.sleep(DELAY / 2)
self.assertEqual(state, ['start'])
api.kill(g)
# will not get there, unless switching is explicitly scheduled by kill
self.assertEqual(state, ['start', 'except'])
api.sleep(DELAY)
self.assertEqual(state, ['start', 'except', 'finished'])
def test_nested_with_timeout(self):
def func():
return api.with_timeout(0.2, api.sleep, 2, timeout_value=1)
try:
api.with_timeout(0.1, func)
self.fail(u'Expected api.TimeoutError')
except api.TimeoutError:
pass
class Foo(object):
pass
if __name__ == '__main__':
main()