Files
deb-python-eventlet/tests/with_timeout.py
2009-07-27 15:40:30 -07:00

220 lines
6.7 KiB
Python

#!/usr/bin/python
# Copyright (c) 2008-2009 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""
Run Python script in a child process. Kill it after timeout has elapsed.
If the script was running unittest test cases, the timeouted test case is
disabled and the script is restarted.
Usage: %prog [-t TIMEOUT] program.py [args]
If program.py timed out, return 7
If program.py exited with non-zero value, return 8
If program.py exited with zero value after several runs, return 9
If program.py exited with non-zero value after several runs, return 10
"""
import sys
import os
import time
import warnings
if sys.argv[1:2] and sys.argv[1]=='-t':
del sys.argv[1]
TIMEOUT = int(sys.argv[1])
del sys.argv[1]
else:
TIMEOUT = 20
try:
disabled_tests
except NameError:
disabled_tests = []
try:
CURRENT_TEST_FILENAME
except NameError:
warnings.filterwarnings('ignore', 'tmpnam is a potential security risk to your program')
CURRENT_TEST_FILENAME = os.tmpnam()
del warnings.filters[0]
class Alarm(Exception):
pass
def al(*args):
raise Alarm
def _test():
"""
>>> system('./with_timeout.py -t 3 __init__.py')
(0, 0)
>>> system('./with_timeout.py -t 3 /usr/lib/python2.5/BaseHTTPServer.py 0')
(7, 3)
>>> system('./with_timeout.py -t 3 with_timeout.py --selftest1')
(9, 3)
>>> system('./with_timeout.py -t 3 with_timeout.py --selftest2')
(10, 3)
>>> system('./with_timeout.py -t 3 with_timeout.py no_such_file.xxx')
(8, 0)
"""
import doctest
doctest.testmod()
if not sys.argv[1:]:
def system(*args):
start = time.time()
res = os.system(*args)
return res>>8, int(time.time()-start)
#system('./with_timeout.py -t 3 with_timeout.py selftest')
#sys.exit(0)
_test()
sys.exit(__doc__.replace('%prog', sys.argv[0]))
elif sys.argv[1:]==['--selftest1']:
import unittest
class Test(unittest.TestCase):
def test1(self):
pass
def test_long(self):
time.sleep(10)
from tests import test_support
test_support.run_unittest(Test)
sys.exit(0)
elif sys.argv[1:]==['--selftest2']:
import unittest
class Test(unittest.TestCase):
def test_fail(self):
fail
def test_long(self):
time.sleep(10)
from tests import test_support
test_support.run_unittest(Test)
sys.exit(0)
filename = sys.argv[1]
del sys.argv[0]
def execf():
#print 'in execf', disabled_tests
def patch_unittest():
"print test name before it was run and write it pipe"
import unittest
class TestCase(unittest.TestCase):
base = unittest.TestCase
def run(self, result=None):
try:
testMethodName = self._testMethodName
except:
testMethodName = self.__testMethodName
name = "%s.%s" % (self.__class__.__name__, testMethodName)
if name in disabled_tests:
return
print name, ' '
sys.stdout.flush()
file(CURRENT_TEST_FILENAME, 'w').write(name)
try:
return self.base.run(self, result)
finally:
sys.stdout.flush()
try:
os.unlink(CURRENT_TEST_FILENAME)
except:
pass
unittest.TestCase = TestCase
patch_unittest()
execfile(filename, globals())
while True:
#print 'before fork, %s' % disabled_tests
try:
os.unlink(CURRENT_TEST_FILENAME)
except:
pass
child = os.fork()
if child == 0:
print '===PYTHON=%s.%s.%s' % sys.version_info[:3]
print '===ARGV=%s' % ' '.join(sys.argv)
print '===TIMEOUT=%r' % TIMEOUT
sys.stdout.flush()
execf()
break
else:
start = time.time()
import signal
signal.signal(signal.SIGALRM, al)
signal.alarm(TIMEOUT)
pid = None
try:
pid, status = os.waitpid(child, 0)
signal.alarm(0)
except Alarm:
try:
os.kill(child, signal.SIGKILL)
except Exception:
pass
print '\n===%s was killed after %s seconds' % (child, time.time()-start)
sys.stdout.flush()
bad_test = None
try:
bad_test = file(CURRENT_TEST_FILENAME).read()
except IOError:
pass
if bad_test in disabled_tests:
print '\n===%s was disabled but it still managed to fail?!' % bad_test
sys.stdout.flush()
break
if bad_test is None:
sys.exit(7)
print '\n===Trying again, now without %s' % bad_test
sys.stdout.flush()
disabled_tests.append(bad_test)
except:
try:
signal.alarm(0)
except:
pass
try:
os.kill(child, signal.SIGKILL)
except:
pass
raise
else:
print '===%s exited with code %s' % (pid, status)
sys.stdout.flush()
if disabled_tests:
print '\n===disabled because of timeout: %s\n%s\n' % (len(disabled_tests), '\n'.join(disabled_tests))
sys.stdout.flush()
if disabled_tests:
if status:
retcode = 10
else:
retcode = 9
else:
if status:
retcode = 8
else:
retcode = 0
sys.exit(retcode)