tests/isolated for tests sandboxed in separate python process

API:
- write main test body into tests/isolated/filename.py
- it must write 'pass' to stdout or 'skip:[optional reason]'
- write a test with single line: tests.run_isolated('filename.py')
FIXME: autorun all files in tests/isolated

This deprecates tests.run_python and ProcessBase.
TODO: rewrite old multiline string test bodies to isolated files
TODO: add timeout to p.communicate() in run_python()
This commit is contained in:
Sergey Shepelev
2015-02-24 23:59:43 +03:00
parent 8656b51a0d
commit 3316c6e9d1
11 changed files with 137 additions and 147 deletions

View File

@@ -3,7 +3,9 @@ from __future__ import print_function
import contextlib
import errno
import functools
import gc
import json
import os
try:
import resource
@@ -15,6 +17,8 @@ import sys
import unittest
import warnings
from nose.plugins.skip import SkipTest
import eventlet
from eventlet import tpool
@@ -38,22 +42,14 @@ def assert_raises(exc_type):
assert False, 'Expected exception {0}'.format(name)
def skipped(func):
""" Decorator that marks a function as skipped. Uses nose's SkipTest exception
if installed. Without nose, this will count skipped tests as passing tests."""
try:
from nose.plugins.skip import SkipTest
def skipped(func, *decorator_args):
"""Decorator that marks a function as skipped.
"""
@functools.wraps(func)
def wrapped(*a, **k):
raise SkipTest(*decorator_args)
def skipme(*a, **k):
raise SkipTest()
skipme.__name__ = func.__name__
return skipme
except ImportError:
# no nose, we'll just skip the test ourselves
def skipme(*a, **k):
print(("Skipping {0}".format(func.__name__)))
skipme.__name__ = func.__name__
return skipme
return wrapped
def skip_if(condition):
@@ -63,16 +59,16 @@ def skip_if(condition):
should return True to skip the test.
"""
def skipped_wrapper(func):
@functools.wraps(func)
def wrapped(*a, **kw):
if isinstance(condition, bool):
result = condition
else:
result = condition(func)
if result:
return skipped(func)(*a, **kw)
raise SkipTest()
else:
return func(*a, **kw)
wrapped.__name__ = func.__name__
return wrapped
return skipped_wrapper
@@ -84,16 +80,16 @@ def skip_unless(condition):
should return True if the condition is satisfied.
"""
def skipped_wrapper(func):
@functools.wraps(func)
def wrapped(*a, **kw):
if isinstance(condition, bool):
result = condition
else:
result = condition(func)
if not result:
return skipped(func)(*a, **kw)
raise SkipTest()
else:
return func(*a, **kw)
wrapped.__name__ = func.__name__
return wrapped
return skipped_wrapper
@@ -271,19 +267,10 @@ def get_database_auth():
".test_dbauth", which contains a json map of parameters to the
connect function.
"""
import os
retval = {
'MySQLdb': {'host': 'localhost', 'user': 'root', 'passwd': ''},
'psycopg2': {'user': 'test'},
}
try:
import json
except ImportError:
try:
import simplejson as json
except ImportError:
print("No json implementation, using baked-in db credentials.")
return retval
if 'EVENTLET_DB_TEST_AUTH' in os.environ:
return json.loads(os.environ.get('EVENTLET_DB_TEST_AUTH'))
@@ -309,9 +296,9 @@ def run_python(path):
if not path.endswith('.py'):
path += '.py'
path = os.path.abspath(path)
dir_ = os.path.dirname(path)
src_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
new_env = os.environ.copy()
new_env['PYTHONPATH'] = os.pathsep.join(sys.path + [dir_])
new_env['PYTHONPATH'] = os.pathsep.join(sys.path + [src_dir])
p = subprocess.Popen(
[sys.executable, path],
env=new_env,
@@ -323,5 +310,16 @@ def run_python(path):
return output
def run_isolated(path, prefix='tests/isolated/'):
output = run_python(prefix + path).rstrip()
if output.startswith(b'skip'):
parts = output.split(b':', 1)
skip_args = []
if len(parts) > 1:
skip_args.append(parts[1])
raise SkipTest(*skip_args)
assert output == b'pass', output
certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')

View File

View File

@@ -0,0 +1,14 @@
from __future__ import print_function
# no standard tests in this file, ignore
__test__ = False
if __name__ == '__main__':
import MySQLdb as m
from eventlet import patcher
from eventlet.green import MySQLdb as gm
patcher.monkey_patch(all=True, MySQLdb=True)
patched_set = set(patcher.already_patched) - set(['psycopg'])
assert patched_set == frozenset(['MySQLdb', 'os', 'select', 'socket', 'thread', 'time'])
assert m.connect == gm.connect
print('pass')

View File

@@ -27,4 +27,4 @@ if __name__ == '__main__':
do_import()
thread.join()
print('ok')
print('pass')

View File

@@ -22,14 +22,10 @@ server / client accept() conn - ExplodingConnectionWrap
V V V
connection makefile() file objects - ExplodingSocketFile <-- these raise
"""
from __future__ import print_function
import socket
import eventlet
from eventlet.support import six
import socket
import sys
import tests.wsgi_test
@@ -37,6 +33,17 @@ import tests.wsgi_test
__test__ = False
TAG_BOOM = "=== ~* BOOM *~ ==="
output_buffer = []
class BufferLog(object):
@staticmethod
def write(s):
output_buffer.append(s.rstrip())
# This test might make you wince
class NaughtySocketAcceptWrap(object):
# server's socket.accept(); patches resulting connection sockets
@@ -54,12 +61,12 @@ class NaughtySocketAcceptWrap(object):
conn_wrap.unwrap()
def arm(self):
print("ca-click")
output_buffer.append("ca-click")
for i in self.conn_reg:
i.arm()
def __call__(self):
print(self.__class__.__name__ + ".__call__")
output_buffer.append(self.__class__.__name__ + ".__call__")
conn, addr = self.sock._really_accept()
self.conn_reg.append(ExplodingConnectionWrap(conn))
return conn, addr
@@ -82,12 +89,12 @@ class ExplodingConnectionWrap(object):
del self.conn._really_makefile
def arm(self):
print("tick")
output_buffer.append("tick")
for i in self.file_reg:
i.arm()
def __call__(self, mode='r', bufsize=-1):
print(self.__class__.__name__ + ".__call__")
output_buffer.append(self.__class__.__name__ + ".__call__")
# file_obj = self.conn._really_makefile(*args, **kwargs)
file_obj = ExplodingSocketFile(self.conn._sock, mode, bufsize)
self.file_reg.append(file_obj)
@@ -102,65 +109,83 @@ class ExplodingSocketFile(eventlet.greenio._fileobject):
self.armed = False
def arm(self):
print("beep")
output_buffer.append("beep")
self.armed = True
def _fuse(self):
if self.armed:
print("=== ~* BOOM *~ ===")
output_buffer.append(TAG_BOOM)
raise socket.timeout("timed out")
def readline(self, *args, **kwargs):
print(self.__class__.__name__ + ".readline")
output_buffer.append(self.__class__.__name__ + ".readline")
self._fuse()
return super(self.__class__, self).readline(*args, **kwargs)
if __name__ == '__main__':
for debug in (False, True):
print("SEPERATOR_SENTINEL")
print("debug set to: %s" % debug)
def step(debug):
output_buffer[:] = []
server_sock = eventlet.listen(('localhost', 0))
server_addr = server_sock.getsockname()
sock_wrap = NaughtySocketAcceptWrap(server_sock)
server_sock = eventlet.listen(('localhost', 0))
server_addr = server_sock.getsockname()
sock_wrap = NaughtySocketAcceptWrap(server_sock)
eventlet.spawn_n(
eventlet.wsgi.server,
debug=debug,
log=sys.stdout,
max_size=128,
site=tests.wsgi_test.Site(),
sock=server_sock,
)
eventlet.spawn_n(
eventlet.wsgi.server,
debug=debug,
log=BufferLog,
max_size=128,
site=tests.wsgi_test.Site(),
sock=server_sock,
)
try:
# req #1 - normal
sock1 = eventlet.connect(server_addr)
sock1.settimeout(0.1)
fd1 = sock1.makefile('rwb')
fd1.write(b'GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
fd1.flush()
tests.wsgi_test.read_http(sock1)
# let the server socket ops catch up, set bomb
eventlet.sleep(0)
output_buffer.append("arming...")
sock_wrap.arm()
# req #2 - old conn, post-arm - timeout
fd1.write(b'GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
fd1.flush()
try:
# req #1 - normal
sock1 = eventlet.connect(server_addr)
sock1.settimeout(0.1)
fd1 = sock1.makefile('rwb')
fd1.write(b'GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
fd1.flush()
tests.wsgi_test.read_http(sock1)
assert False, 'Expected ConnectionClosed exception'
except tests.wsgi_test.ConnectionClosed:
pass
# let the server socket ops catch up, set bomb
eventlet.sleep(0)
print("arming...")
sock_wrap.arm()
fd1.close()
sock1.close()
finally:
# reset streams, then output trapped tracebacks
sock_wrap.unwrap()
# check output asserts in tests.wsgi_test.TestHttpd
# test_143_server_connection_timeout_exception
# req #2 - old conn, post-arm - timeout
fd1.write(b'GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
fd1.flush()
try:
tests.wsgi_test.read_http(sock1)
assert False, 'Expected ConnectionClosed exception'
except tests.wsgi_test.ConnectionClosed:
pass
return output_buffer[:]
fd1.close()
sock1.close()
finally:
# reset streams, then output trapped tracebacks
sock_wrap.unwrap()
# check output asserts in tests.wsgi_test.TestHttpd
# test_143_server_connection_timeout_exception
def main():
output_normal = step(debug=False)
output_debug = step(debug=True)
assert "timed out" in output_debug[-1], repr(output_debug)
# if the BOOM check fails, it's because our timeout didn't happen
# (if eventlet stops using file.readline() to read HTTP headers,
# for instance)
assert TAG_BOOM == output_debug[-2], repr(output_debug)
assert TAG_BOOM == output_normal[-1], repr(output_normal)
assert "Traceback" not in output_debug, repr(output_debug)
assert "Traceback" not in output_normal, repr(output_normal)
print("pass")
if __name__ == '__main__':
main()

View File

@@ -6,15 +6,12 @@ import traceback
import eventlet
from eventlet import event
from tests import (
LimitedTestCase,
run_python,
skip_unless, using_pyevent, get_database_auth,
)
try:
from eventlet.green import MySQLdb
except ImportError:
MySQLdb = False
import tests
from tests import skip_unless, using_pyevent, get_database_auth
def mysql_requirement(_f):
@@ -40,7 +37,7 @@ def mysql_requirement(_f):
return False
class TestMySQLdb(LimitedTestCase):
class TestMySQLdb(tests.LimitedTestCase):
def setUp(self):
self._auth = get_database_auth()['MySQLdb']
self.create_db()
@@ -229,16 +226,7 @@ class TestMySQLdb(LimitedTestCase):
conn.commit()
class TestMonkeyPatch(LimitedTestCase):
class TestMonkeyPatch(tests.LimitedTestCase):
@skip_unless(mysql_requirement)
def test_monkey_patching(self):
testcode_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'mysqldb_test_monkey_patch.py',
)
output = run_python(testcode_path)
lines = output.splitlines()
self.assertEqual(len(lines), 2, output)
self.assertEqual(lines[0].replace("psycopg,", ""),
'mysqltest MySQLdb,os,select,socket,thread,time')
self.assertEqual(lines[1], "connect True")
tests.run_isolated('mysqldb_monkey_patch.py')

View File

@@ -1,12 +0,0 @@
from __future__ import print_function
from eventlet import patcher
# no standard tests in this file, ignore
__test__ = False
if __name__ == '__main__':
import MySQLdb as m
from eventlet.green import MySQLdb as gm
patcher.monkey_patch(all=True, MySQLdb=True)
print("mysqltest {0}".format(",".join(sorted(patcher.already_patched.keys()))))
print("connect {0}".format(m.connect == gm.connect))

View File

@@ -4,7 +4,7 @@ import sys
import tempfile
from eventlet.support import six
from tests import LimitedTestCase, main, run_python, skip_with_pyevent
import tests
base_module_contents = """
@@ -29,7 +29,7 @@ print("importing {0} {1} {2} {3}".format(patching, socket, patching.socket, patc
"""
class ProcessBase(LimitedTestCase):
class ProcessBase(tests.LimitedTestCase):
TEST_TIMEOUT = 3 # starting processes is time-consuming
def setUp(self):
@@ -51,7 +51,7 @@ class ProcessBase(LimitedTestCase):
def launch_subprocess(self, filename):
path = os.path.join(self.tempdir, filename)
output = run_python(path)
output = tests.run_python(path)
if six.PY3:
output = output.decode('utf-8')
separator = '\n'
@@ -246,7 +246,7 @@ def test_monkey_patch_threading():
class Tpool(ProcessBase):
TEST_TIMEOUT = 3
@skip_with_pyevent
@tests.skip_with_pyevent
def test_simple(self):
new_mod = """
import eventlet
@@ -264,7 +264,7 @@ tpool.killall()
assert '2' in lines[0], repr(output)
assert '3' in lines[1], repr(output)
@skip_with_pyevent
@tests.skip_with_pyevent
def test_unpatched_thread(self):
new_mod = """import eventlet
eventlet.monkey_patch(time=False, thread=False)
@@ -277,7 +277,7 @@ import time
output, lines = self.launch_subprocess('newmod.py')
self.assertEqual(len(lines), 2, lines)
@skip_with_pyevent
@tests.skip_with_pyevent
def test_patched_thread(self):
new_mod = """import eventlet
eventlet.monkey_patch(time=False, thread=True)
@@ -497,9 +497,4 @@ t2.join()
def test_importlib_lock():
output = run_python('tests/patcher_test_importlib_lock.py')
assert output.rstrip() == b'ok'
if __name__ == '__main__':
main()
tests.run_isolated('patcher_importlib_lock.py')

View File

@@ -1,14 +1,11 @@
import errno
import struct
from nose.tools import eq_
import eventlet
from eventlet import event
from eventlet import websocket
from eventlet.green import httplib
from eventlet.green import socket
from eventlet import websocket
from eventlet.support import six
from tests.wsgi_test import _TestBase
@@ -129,11 +126,11 @@ class TestWebSocket(_TestBase):
sock.recv(1024)
ws = websocket.RFC6455WebSocket(sock, {}, client=True)
ws.send(b'hello')
eq_(ws.wait(), b'hello')
assert ws.wait() == b'hello'
ws.send(b'hello world!\x01')
ws.send(u'hello world again!')
eq_(ws.wait(), b'hello world!\x01')
eq_(ws.wait(), u'hello world again!')
assert ws.wait() == b'hello world!\x01'
assert ws.wait() == u'hello world again!'
ws.close()
eventlet.sleep(0.01)

View File

@@ -1455,22 +1455,7 @@ class TestHttpd(_TestBase):
# Handle connection socket timeouts
# https://bitbucket.org/eventlet/eventlet/issue/143/
# Runs tests.wsgi_test_conntimeout in a separate process.
testcode_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'wsgi_test_conntimeout.py')
output = tests.run_python(testcode_path)
sections = output.split(b"SEPERATOR_SENTINEL")
# first section is empty
self.assertEqual(3, len(sections), output)
# if the "BOOM" check fails, it's because our timeout didn't happen
# (if eventlet stops using file.readline() to read HTTP headers,
# for instance)
for runlog in sections[1:]:
debug = False if b"debug set to: False" in runlog else True
if debug:
self.assertTrue(b"timed out" in runlog)
self.assertTrue(b"BOOM" in runlog)
self.assertFalse(b"Traceback" in runlog)
tests.run_isolated('wsgi_connection_timeout.py')
def test_server_socket_timeout(self):
self.spawn_server(socket_timeout=0.1)

View File

@@ -48,6 +48,6 @@ deps =
py26: MySQL-python==1.2.5
py27: MySQL-python==1.2.5
commands =
nosetests --verbose tests/
nosetests --verbose {posargs:tests/}
nosetests --verbose --with-doctest eventlet/coros.py eventlet/event.py \
eventlet/pools.py eventlet/queue.py eventlet/timeout.py