remove greentest/tests.py

This commit is contained in:
Denis Bilenko
2009-06-15 15:12:10 +07:00
parent f7e1adeb18
commit 686152355d
13 changed files with 74 additions and 158 deletions

View File

@@ -21,6 +21,8 @@
# package is named greentest, not test, so it won't be confused with test in stdlib
import sys
import os
import errno
import unittest
disabled_marker = '-*-*-*-*-*- disabled -*-*-*-*-*-'
@@ -45,4 +47,10 @@ class LimitedTestCase(unittest.TestCase):
def tearDown(self):
self.timer.cancel()
def find_command(command):
for dir in os.getenv('PATH', '/usr/bin:/usr/sbin').split(os.pathsep):
p = os.path.join(dir, command)
if os.access(p, os.X_OK):
return p
raise IOError(errno.ENOENT, 'Command not found: %r' % command)

View File

@@ -21,10 +21,10 @@
import os
import os.path
from unittest import TestCase, main
from eventlet import api
from eventlet import greenio
from greentest import tests
from eventlet import util
@@ -43,7 +43,7 @@ def check_hub():
assert not api.get_hub().running
class TestApi(tests.TestCase):
class TestApi(TestCase):
mode = 'static'
certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
@@ -218,5 +218,5 @@ class Foo(object):
if __name__ == '__main__':
tests.main()
main()

View File

@@ -19,10 +19,10 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from greentest import tests
from unittest import TestCase, main
from eventlet import coros, api
class TestEvent(tests.TestCase):
class TestEvent(TestCase):
mode = 'static'
def setUp(self):
# raise an exception if we're waiting forever
@@ -115,7 +115,7 @@ class IncrActor(coros.Actor):
if evt: evt.send()
class TestActor(tests.TestCase):
class TestActor(TestCase):
mode = 'static'
def setUp(self):
# raise an exception if we're waiting forever
@@ -229,4 +229,4 @@ class TestActor(tests.TestCase):
if __name__ == '__main__':
tests.main()
main()

View File

@@ -21,9 +21,8 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from unittest import TestCase, main
from eventlet import api, coros
from greentest import tests
from eventlet import db_pool
class DBTester(object):
@@ -513,13 +512,13 @@ class TestMysqlConnectionPool(object):
# for some reason the tpool test hangs if run after the saranwrap test
class Test01MysqlTpool(TestMysqlConnectionPool, TestTpoolConnectionPool, tests.TestCase):
class Test01MysqlTpool(TestMysqlConnectionPool, TestTpoolConnectionPool, TestCase):
pass
class Test02MysqlSaranwrap(TestMysqlConnectionPool, TestSaranwrapConnectionPool, tests.TestCase):
class Test02MysqlSaranwrap(TestMysqlConnectionPool, TestSaranwrapConnectionPool, TestCase):
pass
class Test03MysqlRaw(TestMysqlConnectionPool, TestRawConnectionPool, tests.TestCase):
class Test03MysqlRaw(TestMysqlConnectionPool, TestRawConnectionPool, TestCase):
pass
@@ -529,6 +528,6 @@ if __name__ == '__main__':
except ImportError:
print "Unable to import MySQLdb, skipping db_pool_test."
else:
tests.main()
main()
else:
import MySQLdb

View File

@@ -17,13 +17,13 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from greentest import tests
from unittest import TestCase, main
from eventlet import api
import socket
# TODO try and reuse unit tests from within Python itself
class TestGreenIo(tests.TestCase):
class TestGreenIo(TestCase):
def test_close_with_makefile(self):
def accept_close_early(listener):
# verify that the makefile and the socket are truly independent
@@ -98,4 +98,4 @@ class TestGreenIo(tests.TestCase):
timer.cancel()
if __name__ == '__main__':
tests.main()
main()

View File

@@ -20,6 +20,7 @@
# THE SOFTWARE.
import cgi
from unittest import TestCase, main
from eventlet import api
from eventlet import httpc
@@ -32,9 +33,6 @@ except ImportError:
from StringIO import StringIO
from greentest import tests
class Site(object):
def __init__(self):
self.stuff = {'hello': 'hello world'}
@@ -127,7 +125,7 @@ class TestBase(object):
api.kill(self.victim)
class TestHttpc(TestBase, tests.TestCase):
class TestHttpc(TestBase, TestCase):
def test_get_bad_uri(self):
self.assertRaises(httpc.NotFound,
lambda: httpc.get(self.base_url() + 'b0gu5'))
@@ -259,7 +257,7 @@ class Site307(RedirectSite):
response_code = "307 Temporary Redirect"
class TestHttpc301(TestBase, tests.TestCase):
class TestHttpc301(TestBase, TestCase):
site_class = Site301
def base_url(self):
@@ -284,7 +282,7 @@ class TestHttpc301(TestBase, tests.TestCase):
self.assertEquals(response, data)
class TestHttpc302(TestBase, tests.TestCase):
class TestHttpc302(TestBase, TestCase):
site_class = Site302
def test_get_expired(self):
@@ -306,7 +304,7 @@ class TestHttpc302(TestBase, tests.TestCase):
self.assertEquals(httpc.get(self.base_url() + 'expires/hello', max_retries=1), 'hello world')
class TestHttpc303(TestBase, tests.TestCase):
class TestHttpc303(TestBase, TestCase):
site_class = Site303
def base_url(self):
@@ -322,7 +320,7 @@ class TestHttpc303(TestBase, tests.TestCase):
self.assertEquals(response, data)
class TestHttpc307(TestBase, tests.TestCase):
class TestHttpc307(TestBase, TestCase):
site_class = Site307
def base_url(self):
@@ -344,7 +342,7 @@ class Site500(BasicSite):
return ["screw you world"]
class TestHttpc500(TestBase, tests.TestCase):
class TestHttpc500(TestBase, TestCase):
site_class = Site500
def base_url(self):
@@ -367,7 +365,7 @@ class Site504(BasicSite):
return ["screw you world"]
class TestHttpc504(TestBase, tests.TestCase):
class TestHttpc504(TestBase, TestCase):
site_class = Site504
def base_url(self):
@@ -381,7 +379,7 @@ class TestHttpc504(TestBase, tests.TestCase):
lambda: httpc.post(self.base_url(), data=data))
class TestHttpTime(tests.TestCase):
class TestHttpTime(TestCase):
rfc1123_time = 'Sun, 06 Nov 1994 08:49:37 GMT'
rfc850_time = 'Sunday, 06-Nov-94 08:49:37 GMT'
asctime_time = 'Sun Nov 6 08:49:37 1994'
@@ -395,7 +393,7 @@ class TestHttpTime(tests.TestCase):
self.assertEqual(ticks, self.secs_since_epoch)
class TestProxy(tests.TestCase):
class TestProxy(TestCase):
def test_ssl_proxy(self):
def ssl_proxy(sock):
conn, addr = sock.accept()
@@ -460,4 +458,4 @@ class TestProxy(tests.TestCase):
if __name__ == '__main__':
tests.main()
main()

View File

@@ -19,23 +19,22 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from unittest import TestCase, main
from eventlet import api
from eventlet import httpd
from eventlet import processes
from eventlet import util
from greentest import find_command
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
util.wrap_socket_with_coroutine_socket()
from greentest import tests
class Site(object):
def handle_request(self, req):
path = req.path_segments()
@@ -86,7 +85,7 @@ def read_http(sock):
return response_line, headers, body
class TestHttpd(tests.TestCase):
class TestHttpd(TestCase):
mode = 'static'
def setUp(self):
self.logfile = StringIO()
@@ -148,7 +147,7 @@ class TestHttpd(tests.TestCase):
def skip_test_005_run_apachebench(self):
url = 'http://localhost:12346/'
# ab is apachebench
out = processes.Process(tests.find_command('ab'),
out = processes.Process(find_command('ab'),
['-c','64','-n','1024', '-k', url])
print out.read()
@@ -208,4 +207,4 @@ class TestHttpd(tests.TestCase):
if __name__ == '__main__':
tests.main()
main()

View File

@@ -18,12 +18,12 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from unittest import TestCase, main
from eventlet import api
from eventlet import channel
from eventlet import coros
from eventlet import pools
from greentest import tests
class IntPool(pools.Pool):
def create(self):
@@ -31,7 +31,7 @@ class IntPool(pools.Pool):
return self.current_integer
class TestIntPool(tests.TestCase):
class TestIntPool(TestCase):
mode = 'static'
def setUp(self):
self.pool = IntPool(min_size=0, max_size=4)
@@ -142,7 +142,7 @@ class TestIntPool(tests.TestCase):
timer.cancel()
class TestAbstract(tests.TestCase):
class TestAbstract(TestCase):
mode = 'static'
def test_abstract(self):
## Going for 100% coverage here
@@ -151,7 +151,7 @@ class TestAbstract(tests.TestCase):
self.assertRaises(NotImplementedError, pool.get)
class TestIntPool2(tests.TestCase):
class TestIntPool2(TestCase):
mode = 'static'
def setUp(self):
self.pool = IntPool(min_size=3, max_size=3)
@@ -164,7 +164,7 @@ class TestIntPool2(tests.TestCase):
self.assertEquals(gotten, 1)
class TestOrderAsStack(tests.TestCase):
class TestOrderAsStack(TestCase):
mode = 'static'
def setUp(self):
self.pool = IntPool(max_size=3, order_as_stack=True)
@@ -183,7 +183,7 @@ class RaisePool(pools.Pool):
raise RuntimeError()
class TestCreateRaises(tests.TestCase):
class TestCreateRaises(TestCase):
mode = 'static'
def setUp(self):
self.pool = RaisePool(max_size=3)
@@ -201,7 +201,7 @@ SOMETIMES = RuntimeError('I fail half the time')
class TestTookTooLong(Exception):
pass
class TestFan(tests.TestCase):
class TestFan(TestCase):
mode = 'static'
def setUp(self):
self.timer = api.exc_after(1, TestTookTooLong())
@@ -235,5 +235,5 @@ class TestFan(tests.TestCase):
if __name__ == '__main__':
tests.main()
main()

View File

@@ -20,11 +20,11 @@
# THE SOFTWARE.
import sys
from unittest import TestCase, main
from greentest import tests
from eventlet import processes
class TestEchoPool(tests.TestCase):
class TestEchoPool(TestCase):
def setUp(self):
self.pool = processes.ProcessPool('echo', ["hello"])
@@ -47,7 +47,7 @@ class TestEchoPool(tests.TestCase):
self.pool.put(proc)
class TestCatPool(tests.TestCase):
class TestCatPool(TestCase):
def setUp(self):
self.pool = processes.ProcessPool('cat')
@@ -88,7 +88,7 @@ class TestCatPool(tests.TestCase):
self.pool.put(proc)
class TestDyingProcessesLeavePool(tests.TestCase):
class TestDyingProcessesLeavePool(TestCase):
def setUp(self):
self.pool = processes.ProcessPool('echo', ['hello'], max_size=1)
@@ -107,7 +107,7 @@ class TestDyingProcessesLeavePool(tests.TestCase):
self.assert_(proc is not proc2)
class TestProcessLivesForever(tests.TestCase):
class TestProcessLivesForever(TestCase):
def setUp(self):
self.pool = processes.ProcessPool(sys.executable, ['-c', 'print "y"; import time; time.sleep(0.4); print "y"'], max_size=1)
@@ -131,4 +131,4 @@ class TestProcessLivesForever(tests.TestCase):
if __name__ == '__main__':
tests.main()
main()

View File

@@ -1,86 +0,0 @@
# @author Donovan Preston
# @brief Indirection layer for tests in case we want to fix unittest.
#
# Copyright (c) 2006-2007, Linden Research, Inc.
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import errno
import os
import sys
import unittest, doctest
TestCase = unittest.TestCase
name = getattr(sys.modules['__main__'], '__name__', None)
main = unittest.main
# listing of files containing doctests
doc_test_files = ['coros']
def find_command(command):
for dir in os.getenv('PATH', '/usr/bin:/usr/sbin').split(os.pathsep):
p = os.path.join(dir, command)
if os.access(p, os.X_OK):
return p
raise IOError(errno.ENOENT, 'Command not found: %r' % command)
def run_all_tests(test_files = doc_test_files):
""" Runs all the unit tests, returning immediately after the
first failed test.
Returns true if the tests all succeeded. This method is really much longer
than it ought to be.
"""
eventlet_dir = os.path.realpath(os.path.dirname(__file__))
if eventlet_dir not in sys.path:
sys.path.append(eventlet_dir)
# add all _test files as a policy
import glob
test_files += [os.path.splitext(os.path.basename(x))[0]
for x in glob.glob(os.path.join(eventlet_dir, "*_test.py"))]
test_files.sort()
for test_file in test_files:
print "-=", test_file, "=-"
try:
test_module = __import__(test_file)
except ImportError:
print "Unable to import %s, skipping" % test_file
continue
if test_file.endswith('_test'):
# gawd, unittest, why you make it so difficult to just run some tests!
suite = unittest.findTestCases(test_module)
result = unittest.TextTestRunner().run(suite)
if not result.wasSuccessful():
return False
else:
failures, tests = doctest.testmod(test_module)
if failures:
return False
else:
print "OK"
return True
if __name__ == '__main__':
run_all_tests()

View File

@@ -19,12 +19,11 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import unittest
from unittest import TestCase, main
from eventlet import api, timer
from greentest import tests
class TestTimer(tests.TestCase):
class TestTimer(TestCase):
mode = 'static'
def test_copy(self):
t = timer.Timer(0, lambda: None)
@@ -62,4 +61,4 @@ class TestTimer(tests.TestCase):
assert not hub.running
if __name__ == '__main__':
unittest.main()
main()

View File

@@ -13,16 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from eventlet import coros, api, tpool
from greentest import tests
from sys import stdout
import time
from unittest import TestCase, main
import random
r = random.WichmannHill()
import uuid
from eventlet import coros, api, tpool
r = random.WichmannHill()
_g_debug = False
def prnt(msg):
@@ -59,7 +58,7 @@ one = 1
two = 2
three = 3
class TestTpool(tests.TestCase):
class TestTpool(TestCase):
def setUp(self):
# turn off exception printing, because we'll be deliberately
# triggering exceptions in our tests
@@ -214,4 +213,4 @@ class TestTpool(tests.TestCase):
if __name__ == '__main__':
tests.main()
main()

View File

@@ -21,20 +21,20 @@
import cgi
import os
from unittest import TestCase, main
from eventlet import api
from eventlet import wsgi
from eventlet import processes
from greentest import find_command
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
from greentest import tests
def hello_world(env, start_response):
if env['PATH_INFO'] == 'notexist':
start_response('404 Not Found', [('Content-type', 'text/plain')])
@@ -104,7 +104,7 @@ def read_http(sock):
return response_line, headers, body
class TestHttpd(tests.TestCase):
class TestHttpd(TestCase):
mode = 'static'
def setUp(self):
self.logfile = StringIO()
@@ -167,7 +167,7 @@ class TestHttpd(tests.TestCase):
def skip_test_005_run_apachebench(self):
url = 'http://localhost:12346/'
# ab is apachebench
out = processes.Process(tests.find_command('ab'),
out = processes.Process(find_command('ab'),
['-c','64','-n','1024', '-k', url])
print out.read()
@@ -293,4 +293,4 @@ class TestHttpd(tests.TestCase):
if __name__ == '__main__':
tests.main()
main()