From a651a3097f7ba957ad29342dac011d5a9218e76b Mon Sep 17 00:00:00 2001 From: Sergey Shepelev Date: Wed, 16 Jul 2014 13:54:42 +0400 Subject: [PATCH] tests: deprecated TestCase.assert_() -> assert keyword https://github.com/eventlet/eventlet/issues/101 --- tests/__init__.py | 12 ++-- tests/backdoor_test.py | 4 +- tests/db_pool_test.py | 30 +++++----- tests/debug_test.py | 29 +++++----- tests/env_test.py | 5 +- tests/greenio_test.py | 22 +++---- tests/greenpool_test.py | 6 +- tests/greenthread_test.py | 28 ++++----- tests/hub_test.py | 4 +- tests/mysqldb_test.py | 8 +-- tests/patcher_psycopg_test.py | 2 +- tests/patcher_test.py | 54 +++++++++--------- tests/processes_test.py | 6 +- tests/queue_test.py | 12 ++-- tests/test__coros_queue.py | 6 +- tests/thread_test.py | 27 +++++---- tests/timeout_test.py | 11 ++-- tests/tpool_test.py | 42 +++++++------- tests/websocket_new_test.py | 6 +- tests/websocket_test.py | 42 +++++++------- tests/wsgi_test.py | 104 +++++++++++++++++----------------- 21 files changed, 230 insertions(+), 230 deletions(-) diff --git a/tests/__init__.py b/tests/__init__.py index 2234f0d..06596f7 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -185,18 +185,14 @@ class LimitedTestCase(unittest.TestCase): verify_hub_empty() def assert_less_than(self, a, b, msg=None): - if msg: - self.assert_(a < b, msg) - else: - self.assert_(a < b, "%s not less than %s" % (a, b)) + msg = msg or "%s not less than %s" % (a, b) + assert a < b, msg assertLessThan = assert_less_than def assert_less_than_equal(self, a, b, msg=None): - if msg: - self.assert_(a <= b, msg) - else: - self.assert_(a <= b, "%s not less than or equal to %s" % (a, b)) + msg = msg or "%s not less than or equal to %s" % (a, b) + assert a <= b, msg assertLessThanEqual = assert_less_than_equal diff --git a/tests/backdoor_test.py b/tests/backdoor_test.py index 5df6d35..f304d3f 100644 --- a/tests/backdoor_test.py +++ b/tests/backdoor_test.py @@ -14,10 +14,10 @@ class BackdoorTest(LimitedTestCase): client = socket.socket() client.connect(('localhost', listener.getsockname()[1])) f = client.makefile('rw') - self.assert_(b'Python' in f.readline()) + assert b'Python' in f.readline() f.readline() # build info f.readline() # help info - self.assert_(b'InteractiveConsole' in f.readline()) + assert b'InteractiveConsole' in f.readline() self.assertEqual(b'>>> ', f.read(4)) f.write(b'print("hi")\n') f.flush() diff --git a/tests/db_pool_test.py b/tests/db_pool_test.py index 4bdccc7..ce941ab 100644 --- a/tests/db_pool_test.py +++ b/tests/db_pool_test.py @@ -74,10 +74,10 @@ class DBConnectionPool(DBTester): def assert_cursor_works(self, cursor): cursor.execute("select 1") rows = cursor.fetchall() - self.assert_(rows) + assert rows def test_connecting(self): - self.assert_(self.connection is not None) + assert self.connection is not None def test_create_cursor(self): cursor = self.connection.cursor() @@ -92,7 +92,7 @@ class DBConnectionPool(DBTester): cursor = self.connection.cursor() try: cursor.execute("garbage blah blah") - self.assert_(False) + assert False except AssertionError: raise except Exception: @@ -101,39 +101,39 @@ class DBConnectionPool(DBTester): def test_put_none(self): # the pool is of size 1, and its only connection is out - self.assert_(self.pool.free() == 0) + assert self.pool.free() == 0 self.pool.put(None) # ha ha we fooled it into thinking that we had a dead process - self.assert_(self.pool.free() == 1) + assert self.pool.free() == 1 conn2 = self.pool.get() - self.assert_(conn2 is not None) - self.assert_(conn2.cursor) + assert conn2 is not None + assert conn2.cursor self.pool.put(conn2) def test_close_does_a_put(self): - self.assert_(self.pool.free() == 0) + assert self.pool.free() == 0 self.connection.close() - self.assert_(self.pool.free() == 1) + assert self.pool.free() == 1 self.assertRaises(AttributeError, self.connection.cursor) @skipped def test_deletion_does_a_put(self): # doing a put on del causes some issues if __del__ is called in the # main coroutine, so, not doing that for now - self.assert_(self.pool.free() == 0) + assert self.pool.free() == 0 self.connection = None - self.assert_(self.pool.free() == 1) + assert self.pool.free() == 1 def test_put_doesnt_double_wrap(self): self.pool.put(self.connection) conn = self.pool.get() - self.assert_(not isinstance(conn._base, db_pool.PooledConnectionWrapper)) + assert not isinstance(conn._base, db_pool.PooledConnectionWrapper) self.pool.put(conn) def test_bool(self): - self.assert_(self.connection) + assert self.connection self.connection.close() - self.assert_(not self.connection) + assert not self.connection def fill_up_table(self, conn): curs = conn.cursor() @@ -268,7 +268,7 @@ class DBConnectionPool(DBTester): self.assert_(isinstance(self.connection, db_pool.GenericConnectionWrapper)) conn = self.pool._unwrap_connection(self.connection) - self.assert_(not isinstance(conn, db_pool.GenericConnectionWrapper)) + assert not isinstance(conn, db_pool.GenericConnectionWrapper) self.assertEqual(None, self.pool._unwrap_connection(None)) self.assertEqual(None, self.pool._unwrap_connection(1)) diff --git a/tests/debug_test.py b/tests/debug_test.py index da6d7ab..67617f4 100644 --- a/tests/debug_test.py +++ b/tests/debug_test.py @@ -22,12 +22,12 @@ class TestSpew(TestCase): def test_spew(self): debug.spew() - self.failUnless(isinstance(self.tracer, debug.Spew)) + assert isinstance(self.tracer, debug.Spew) def test_unspew(self): debug.spew() debug.unspew() - self.failUnlessEqual(self.tracer, None) + assert self.tracer is None def test_line(self): sys.stdout = six.StringIO() @@ -36,8 +36,8 @@ class TestSpew(TestCase): s(f, "line", None) lineno = f.f_lineno - 1 # -1 here since we called with frame f in the line above output = sys.stdout.getvalue() - self.failUnless("%s:%i" % (__name__, lineno) in output, "Didn't find line %i in %s" % (lineno, output)) - self.failUnless("f= 0) - self.assert_(len(results2) > 0) + assert len(results1) > 0 + assert len(results2) > 0 debug.hub_prevent_multiple_readers() @skipped # by rdw because it fails but it's not clear how to make it pass diff --git a/tests/greenpool_test.py b/tests/greenpool_test.py index 90b2bfa..113b195 100644 --- a/tests/greenpool_test.py +++ b/tests/greenpool_test.py @@ -420,7 +420,7 @@ class Stress(tests.LimitedTestCase): if received % 5 == 0: eventlet.sleep(0.0001) unique, order = i - self.assert_(latest[unique] < order) + assert latest[unique] < order latest[unique] = order for l in latest[1:]: self.assertEqual(l, iters - 1) @@ -449,14 +449,14 @@ class Stress(tests.LimitedTestCase): if latest == -1: gc.collect() initial_obj_count = len(gc.get_objects()) - self.assert_(i > latest) + assert i > latest latest = i if latest % 5 == 0: eventlet.sleep(0.001) if latest % 10 == 0: gc.collect() objs_created = len(gc.get_objects()) - initial_obj_count - self.assert_(objs_created < 25 * concurrency, objs_created) + assert objs_created < 25 * concurrency, objs_created # make sure we got to the end self.assertEqual(latest, count - 1) diff --git a/tests/greenthread_test.py b/tests/greenthread_test.py index 9f0be11..0f7ba7b 100644 --- a/tests/greenthread_test.py +++ b/tests/greenthread_test.py @@ -15,28 +15,28 @@ def waiter(a): class Asserts(object): def assert_dead(self, gt): if hasattr(gt, 'wait'): - self.assertRaises(greenlet.GreenletExit, gt.wait) - self.assert_(gt.dead) - self.assert_(not gt) + self.assertRaises(greenlet.GreenletExit, gt.wait) + assert gt.dead + assert not gt class Spawn(LimitedTestCase, Asserts): def tearDown(self): global _g_results super(Spawn, self).tearDown() _g_results = [] - + def test_simple(self): gt = greenthread.spawn(passthru, 1, b=2) self.assertEqual(gt.wait(), ((1,),{'b':2})) self.assertEqual(_g_results, [((1,),{'b':2})]) - + def test_n(self): gt = greenthread.spawn_n(passthru, 2, b=3) - self.assert_(not gt.dead) + assert not gt.dead greenthread.sleep(0) - self.assert_(gt.dead) + assert gt.dead self.assertEqual(_g_results, [((2,),{'b':3})]) - + def test_kill(self): gt = greenthread.spawn(passthru, 6) greenthread.kill(gt) @@ -45,7 +45,7 @@ class Spawn(LimitedTestCase, Asserts): self.assertEqual(_g_results, []) greenthread.kill(gt) self.assert_dead(gt) - + def test_kill_meth(self): gt = greenthread.spawn(passthru, 6) gt.kill() @@ -54,7 +54,7 @@ class Spawn(LimitedTestCase, Asserts): self.assertEqual(_g_results, []) gt.kill() self.assert_dead(gt) - + def test_kill_n(self): gt = greenthread.spawn_n(passthru, 7) greenthread.kill(gt) @@ -63,7 +63,7 @@ class Spawn(LimitedTestCase, Asserts): self.assertEqual(_g_results, []) greenthread.kill(gt) self.assert_dead(gt) - + def test_link(self): results = [] def link_func(g, *a, **kw): @@ -74,7 +74,7 @@ class Spawn(LimitedTestCase, Asserts): gt.link(link_func, 4, b=5) self.assertEqual(gt.wait(), ((5,), {})) self.assertEqual(results, [gt, (4,), {'b':5}]) - + def test_link_after_exited(self): results = [] def link_func(g, *a, **kw): @@ -105,7 +105,7 @@ class SpawnAfter(Spawn): def test_basic(self): gt = greenthread.spawn_after(0.1, passthru, 20) self.assertEqual(gt.wait(), ((20,), {})) - + def test_cancel(self): gt = greenthread.spawn_after(0.1, passthru, 21) gt.cancel() @@ -116,7 +116,7 @@ class SpawnAfter(Spawn): greenthread.sleep(0) gt.cancel() self.assertEqual(gt.wait(), 22) - + def test_kill_already_started(self): gt = greenthread.spawn_after(0, waiter, 22) greenthread.sleep(0) diff --git a/tests/hub_test.py b/tests/hub_test.py index b7f307d..8110d63 100644 --- a/tests/hub_test.py +++ b/tests/hub_test.py @@ -215,7 +215,7 @@ class TestHubSelection(LimitedTestCase): oldhub = hubs.get_hub() try: hubs.use_hub(Foo) - self.assert_(isinstance(hubs.get_hub(), Foo), hubs.get_hub()) + assert isinstance(hubs.get_hub(), Foo), hubs.get_hub() finally: hubs._threadlocal.hub = oldhub @@ -282,7 +282,7 @@ except eventlet.Timeout: os.kill(p.pid, signal.SIGCONT) output, _ = p.communicate() lines = output.decode('utf-8', 'replace').splitlines() - self.assert_("exited correctly" in lines[-1], output) + assert "exited correctly" in lines[-1], output shutil.rmtree(self.tempdir) diff --git a/tests/mysqldb_test.py b/tests/mysqldb_test.py index c4a77b0..82245d9 100644 --- a/tests/mysqldb_test.py +++ b/tests/mysqldb_test.py @@ -125,7 +125,7 @@ class TestMySQLdb(LimitedTestCase): self.assertEqual(len(rows), 1) self.assertEqual(len(rows[0]), 1) self.assertEqual(rows[0][0], 1) - self.assert_(counter[0] > 0, counter[0]) + assert counter[0] > 0, counter[0] gt.kill() def assert_cursor_works(self, cursor): @@ -145,10 +145,10 @@ class TestMySQLdb(LimitedTestCase): for key in dir(orig): if key not in ('__author__', '__path__', '__revision__', '__version__', '__loader__'): - self.assert_(hasattr(MySQLdb, key), "%s %s" % (key, getattr(orig, key))) + assert hasattr(MySQLdb, key), "%s %s" % (key, getattr(orig, key)) def test_connecting(self): - self.assert_(self.connection is not None) + assert self.connection is not None def test_connecting_annoyingly(self): self.assert_connection_works(MySQLdb.Connect(**self._auth)) @@ -168,7 +168,7 @@ class TestMySQLdb(LimitedTestCase): cursor = self.connection.cursor() try: cursor.execute("garbage blah blah") - self.assert_(False) + assert False except AssertionError: raise except Exception: diff --git a/tests/patcher_psycopg_test.py b/tests/patcher_psycopg_test.py index 3c837da..d4b4921 100644 --- a/tests/patcher_psycopg_test.py +++ b/tests/patcher_psycopg_test.py @@ -53,4 +53,4 @@ class PatchingPsycopg(patcher_test.ProcessBase): print("Can't test psycopg2 patching; it's not installed.") return # if there's anything wrong with the test program it'll have a stack trace - self.assert_(lines[0].startswith('done'), output) + assert lines[0].startswith('done'), output diff --git a/tests/patcher_test.py b/tests/patcher_test.py index f6e4ff7..7d12fd2 100644 --- a/tests/patcher_test.py +++ b/tests/patcher_test.py @@ -68,14 +68,14 @@ class ImportPatched(ProcessBase): self.write_to_tempfile("patching", patching_module_contents) self.write_to_tempfile("importing", import_module_contents) output, lines = self.launch_subprocess('importing.py') - self.assert_(lines[0].startswith('patcher'), repr(output)) - self.assert_(lines[1].startswith('base'), repr(output)) - self.assert_(lines[2].startswith('importing'), repr(output)) - self.assert_('eventlet.green.socket' in lines[1], repr(output)) - self.assert_('eventlet.green.urllib' in lines[1], repr(output)) - self.assert_('eventlet.green.socket' in lines[2], repr(output)) - self.assert_('eventlet.green.urllib' in lines[2], repr(output)) - self.assert_('eventlet.green.httplib' not in lines[2], repr(output)) + assert lines[0].startswith('patcher'), repr(output) + assert lines[1].startswith('base'), repr(output) + assert lines[2].startswith('importing'), repr(output) + assert 'eventlet.green.socket' in lines[1], repr(output) + assert 'eventlet.green.urllib' in lines[1], repr(output) + assert 'eventlet.green.socket' in lines[2], repr(output) + assert 'eventlet.green.urllib' in lines[2], repr(output) + assert 'eventlet.green.httplib' not in lines[2], repr(output) def test_import_patched_defaults(self): self.write_to_tempfile("base", base_module_contents) @@ -86,10 +86,10 @@ print("newmod {0} {1} {2}".format(base, base.socket, base.urllib.socket.socket)) """ self.write_to_tempfile("newmod", new_mod) output, lines = self.launch_subprocess('newmod.py') - self.assert_(lines[0].startswith('base'), repr(output)) - self.assert_(lines[1].startswith('newmod'), repr(output)) - self.assert_('eventlet.green.socket' in lines[1], repr(output)) - self.assert_('GreenSocket' in lines[1], repr(output)) + assert lines[0].startswith('base'), repr(output) + assert lines[1].startswith('newmod'), repr(output) + assert 'eventlet.green.socket' in lines[1], repr(output) + assert 'GreenSocket' in lines[1], repr(output) class MonkeyPatch(ProcessBase): @@ -103,7 +103,7 @@ print("newmod {0} {1}".format(socket.socket, urllib.socket.socket)) """ self.write_to_tempfile("newmod", new_mod) output, lines = self.launch_subprocess('newmod.py') - self.assert_(lines[0].startswith('newmod'), repr(output)) + assert lines[0].startswith('newmod'), repr(output) self.assertEqual(lines[0].count('GreenSocket'), 2, repr(output)) def test_early_patching(self): @@ -117,7 +117,7 @@ print("newmod") self.write_to_tempfile("newmod", new_mod) output, lines = self.launch_subprocess('newmod.py') self.assertEqual(len(lines), 2, repr(output)) - self.assert_(lines[0].startswith('newmod'), repr(output)) + assert lines[0].startswith('newmod'), repr(output) def test_late_patching(self): new_mod = """ @@ -131,7 +131,7 @@ print("newmod") self.write_to_tempfile("newmod", new_mod) output, lines = self.launch_subprocess('newmod.py') self.assertEqual(len(lines), 2, repr(output)) - self.assert_(lines[0].startswith('newmod'), repr(output)) + assert lines[0].startswith('newmod'), repr(output) def test_typeerror(self): new_mod = """ @@ -140,8 +140,8 @@ patcher.monkey_patch(finagle=True) """ self.write_to_tempfile("newmod", new_mod) output, lines = self.launch_subprocess('newmod.py') - self.assert_(lines[-2].startswith('TypeError'), repr(output)) - self.assert_('finagle' in lines[-2], repr(output)) + assert lines[-2].startswith('TypeError'), repr(output) + assert 'finagle' in lines[-2], repr(output) def assert_boolean_logic(self, call, expected, not_expected=''): expected_list = ", ".join(['"%s"' % x for x in expected.split(',') if len(x)]) @@ -158,7 +158,7 @@ print("already_patched {0}".format(",".join(sorted(patcher.already_patched.keys( self.write_to_tempfile("newmod", new_mod) output, lines = self.launch_subprocess('newmod.py') ap = 'already_patched' - self.assert_(lines[0].startswith(ap), repr(output)) + assert lines[0].startswith(ap), repr(output) patched_modules = lines[0][len(ap):].strip() # psycopg might or might not be patched based on installed modules patched_modules = patched_modules.replace("psycopg,", "") @@ -245,9 +245,9 @@ tpool.killall() self.write_to_tempfile("newmod", new_mod) output, lines = self.launch_subprocess('newmod.py') self.assertEqual(len(lines), 3, output) - self.assert_(lines[0].startswith('newmod'), repr(output)) - self.assert_('2' in lines[0], repr(output)) - self.assert_('3' in lines[1], repr(output)) + assert lines[0].startswith('newmod'), repr(output) + assert '2' in lines[0], repr(output) + assert '3' in lines[1], repr(output) @skip_with_pyevent def test_unpatched_thread(self): @@ -308,7 +308,7 @@ print(len(_threading._active)) self.write_to_tempfile("newmod", new_mod) output, lines = self.launch_subprocess('newmod') self.assertEqual(len(lines), 4, "\n".join(lines)) - self.assert_(lines[0].startswith('= previtem) + assert item >= previtem # make sure the tick happened at least a few times so that we know # that our iterations in foo() were actually tpooled - self.assert_(counter[0] > 10, counter[0]) + assert counter[0] > 10, counter[0] gt.kill() @skip_with_pyevent @@ -231,31 +231,31 @@ class TestTpool(LimitedTestCase): @skip_with_pyevent def test_autowrap(self): x = tpool.Proxy({'a': 1, 'b': 2}, autowrap=(int,)) - self.assert_(isinstance(x.get('a'), tpool.Proxy)) - self.assert_(not isinstance(x.items(), tpool.Proxy)) + assert isinstance(x.get('a'), tpool.Proxy) + assert not isinstance(x.items(), tpool.Proxy) # attributes as well as callables from tests import tpool_test x = tpool.Proxy(tpool_test, autowrap=(int,)) - self.assert_(isinstance(x.one, tpool.Proxy)) - self.assert_(not isinstance(x.none, tpool.Proxy)) + assert isinstance(x.one, tpool.Proxy) + assert not isinstance(x.none, tpool.Proxy) @skip_with_pyevent def test_autowrap_names(self): x = tpool.Proxy({'a': 1, 'b': 2}, autowrap_names=('get',)) - self.assert_(isinstance(x.get('a'), tpool.Proxy)) - self.assert_(not isinstance(x.items(), tpool.Proxy)) + assert isinstance(x.get('a'), tpool.Proxy) + assert not isinstance(x.items(), tpool.Proxy) from tests import tpool_test x = tpool.Proxy(tpool_test, autowrap_names=('one',)) - self.assert_(isinstance(x.one, tpool.Proxy)) - self.assert_(not isinstance(x.two, tpool.Proxy)) + assert isinstance(x.one, tpool.Proxy) + assert not isinstance(x.two, tpool.Proxy) @skip_with_pyevent def test_autowrap_both(self): from tests import tpool_test x = tpool.Proxy(tpool_test, autowrap=(int,), autowrap_names=('one',)) - self.assert_(isinstance(x.one, tpool.Proxy)) + assert isinstance(x.one, tpool.Proxy) # violating the abstraction to check that we didn't double-wrap - self.assert_(not isinstance(x._obj, tpool.Proxy)) + assert not isinstance(x._obj, tpool.Proxy) @skip_with_pyevent def test_callable(self): @@ -265,7 +265,7 @@ class TestTpool(LimitedTestCase): self.assertEqual(4, x(4)) # verify that it wraps return values if specified x = tpool.Proxy(wrapped, autowrap_names=('__call__',)) - self.assert_(isinstance(x(4), tpool.Proxy)) + assert isinstance(x(4), tpool.Proxy) self.assertEqual("4", str(x(4))) @skip_with_pyevent diff --git a/tests/websocket_new_test.py b/tests/websocket_new_test.py index 55569a4..284a867 100644 --- a/tests/websocket_new_test.py +++ b/tests/websocket_new_test.py @@ -159,7 +159,7 @@ class TestWebSocket(_TestBase): resp = sock.recv(1024) # get the headers sock.close() # close while the app is running done_with_request.wait() - self.assert_(not error_detected[0]) + assert not error_detected[0] def test_client_closing_connection_13(self): error_detected = [False] @@ -191,7 +191,7 @@ class TestWebSocket(_TestBase): closeframe = struct.pack('!BBIH', 1 << 7 | 8, 1 << 7 | 2, 0, 1000) sock.sendall(closeframe) # "Close the connection" packet. done_with_request.wait() - self.assert_(not error_detected[0]) + assert not error_detected[0] def test_client_invalid_packet_13(self): error_detected = [False] @@ -222,4 +222,4 @@ class TestWebSocket(_TestBase): resp = sock.recv(1024) # get the headers sock.sendall('\x07\xff') # Weird packet. done_with_request.wait() - self.assert_(not error_detected[0]) + assert not error_detected[0] diff --git a/tests/websocket_test.py b/tests/websocket_test.py index 571f4c3..e2a2570 100644 --- a/tests/websocket_test.py +++ b/tests/websocket_test.py @@ -36,10 +36,10 @@ wsapp = WebSocketWSGI(handle) class TestWebSocket(_TestBase): TEST_TIMEOUT = 5 - + def set_site(self): self.site = wsapp - + def test_incorrect_headers(self): def raiser(): try: @@ -81,7 +81,7 @@ class TestWebSocket(_TestBase): self.assertEqual(resp.status, 400) self.assertEqual(resp.getheader('connection'), 'close') self.assertEqual(resp.read(), '') - + # Now, miss off key2 headers = dict(kv.split(': ') for kv in [ "Upgrade: WebSocket", @@ -146,8 +146,8 @@ class TestWebSocket(_TestBase): 'Sec-WebSocket-Origin: http://localhost:%s' % self.port, 'Sec-WebSocket-Protocol: ws', 'Sec-WebSocket-Location: ws://localhost:%s/echo\r\n\r\n8jKS\'y:G*Co,Wxa-' % self.port])) - - + + def test_query_string(self): # verify that the query string comes out the other side unscathed connect = [ @@ -197,7 +197,7 @@ class TestWebSocket(_TestBase): 'Sec-WebSocket-Origin: http://localhost:%s' % self.port, 'Sec-WebSocket-Protocol: ws', 'Sec-WebSocket-Location: ws://localhost:%s/echo?\r\n\r\n8jKS\'y:G*Co,Wxa-' % self.port])) - + def test_sending_messages_to_websocket_75(self): connect = [ @@ -330,7 +330,7 @@ class TestWebSocket(_TestBase): resp = sock.recv(1024) # get the headers sock.close() # close while the app is running done_with_request.wait() - self.assert_(not error_detected[0]) + assert not error_detected[0] def test_breaking_the_connection_76(self): error_detected = [False] @@ -363,8 +363,8 @@ class TestWebSocket(_TestBase): resp = sock.recv(1024) # get the headers sock.close() # close while the app is running done_with_request.wait() - self.assert_(not error_detected[0]) - + assert not error_detected[0] + def test_client_closing_connection_76(self): error_detected = [False] done_with_request = event.Event() @@ -396,8 +396,8 @@ class TestWebSocket(_TestBase): resp = sock.recv(1024) # get the headers sock.sendall('\xff\x00') # "Close the connection" packet. done_with_request.wait() - self.assert_(not error_detected[0]) - + assert not error_detected[0] + def test_client_invalid_packet_76(self): error_detected = [False] done_with_request = event.Event() @@ -429,8 +429,8 @@ class TestWebSocket(_TestBase): resp = sock.recv(1024) # get the headers sock.sendall('\xef\x00') # Weird packet. done_with_request.wait() - self.assert_(error_detected[0]) - + assert error_detected[0] + def test_server_closing_connect_76(self): connect = [ "GET / HTTP/1.1", @@ -479,7 +479,7 @@ class TestWebSocket(_TestBase): sock.sendall('\r\n'.join(connect) + '\r\n\r\n') resp = sock.recv(1024) done_with_request.wait() - self.assert_(error_detected[0]) + assert error_detected[0] def test_app_socket_errors_76(self): error_detected = [False] @@ -511,7 +511,7 @@ class TestWebSocket(_TestBase): sock.sendall('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U') resp = sock.recv(1024) done_with_request.wait() - self.assert_(error_detected[0]) + assert error_detected[0] class TestWebSocketSSL(_TestBase): @@ -521,7 +521,7 @@ class TestWebSocketSSL(_TestBase): @skip_if_no_ssl def test_ssl_sending_messages(self): s = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)), - certfile=certificate_file, + certfile=certificate_file, keyfile=private_key_file, server_side=True) self.spawn_server(sock=s) @@ -541,9 +541,9 @@ class TestWebSocketSSL(_TestBase): sock.sendall('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U') first_resp = sock.recv(1024) # make sure it sets the wss: protocol on the location header - loc_line = [x for x in first_resp.split("\r\n") + loc_line = [x for x in first_resp.split("\r\n") if x.lower().startswith('sec-websocket-location')][0] - self.assert_("wss://localhost" in loc_line, + self.assert_("wss://localhost" in loc_line, "Expecting wss protocol in location: %s" % loc_line) sock.sendall('\x00hello\xFF') result = sock.recv(1024) @@ -584,11 +584,11 @@ class TestWebSocketObject(LimitedTestCase): def test_send_to_ws(self): ws = self.test_ws ws.send(u'hello') - self.assert_(ws.socket.sendall.called_with("\x00hello\xFF")) + assert ws.socket.sendall.called_with("\x00hello\xFF") ws.send(10) - self.assert_(ws.socket.sendall.called_with("\x0010\xFF")) + assert ws.socket.sendall.called_with("\x0010\xFF") def test_close_ws(self): ws = self.test_ws ws.close() - self.assert_(ws.socket.shutdown.called_with(True)) + assert ws.socket.shutdown.called_with(True) diff --git a/tests/wsgi_test.py b/tests/wsgi_test.py index 5bc71b0..4a19368 100644 --- a/tests/wsgi_test.py +++ b/tests/wsgi_test.py @@ -263,8 +263,8 @@ class TestHttpd(_TestBase): result = fd.read() fd.close() ## The server responds with the maximum version it supports - self.assert_(result.startswith('HTTP'), result) - self.assert_(result.endswith('hello world')) + assert result.startswith('HTTP'), result + assert result.endswith('hello world') def test_002_keepalive(self): sock = eventlet.connect( @@ -388,7 +388,7 @@ class TestHttpd(_TestBase): fd = sock.makefile('rw') fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') fd.flush() - self.assert_('Transfer-Encoding: chunked' in fd.read()) + assert 'Transfer-Encoding: chunked' in fd.read() def test_010_no_chunked_http_1_0(self): self.site.application = chunked_app @@ -398,7 +398,7 @@ class TestHttpd(_TestBase): fd = sock.makefile('rw') fd.write('GET / HTTP/1.0\r\nHost: localhost\r\nConnection: close\r\n\r\n') fd.flush() - self.assert_('Transfer-Encoding: chunked' not in fd.read()) + assert 'Transfer-Encoding: chunked' not in fd.read() def test_011_multiple_chunks(self): self.site.application = big_chunks @@ -415,7 +415,7 @@ class TestHttpd(_TestBase): break else: headers += line - self.assert_('Transfer-Encoding: chunked' in headers) + assert 'Transfer-Encoding: chunked' in headers chunks = 0 chunklen = int(fd.readline(), 16) while chunklen: @@ -423,7 +423,7 @@ class TestHttpd(_TestBase): fd.read(chunklen) fd.readline() # CRLF chunklen = int(fd.readline(), 16) - self.assert_(chunks > 1) + assert chunks > 1 response = fd.read() # Require a CRLF to close the message body self.assertEqual(response, '\r\n') @@ -481,7 +481,7 @@ class TestHttpd(_TestBase): if fd.readline() == '\r\n': break response = fd.read() - self.assert_(response == 'oh hai', 'invalid response %s' % response) + assert response == 'oh hai', 'invalid response %s' % response sock = eventlet.connect(('localhost', self.port)) fd = sock.makefile('rw') @@ -493,7 +493,7 @@ class TestHttpd(_TestBase): if fd.readline() == '\r\n': break response = fd.read() - self.assert_(response == 'oh hai', 'invalid response %s' % response) + assert response == 'oh hai', 'invalid response %s' % response sock = eventlet.connect(('localhost', self.port)) fd = sock.makefile('rw') @@ -505,7 +505,7 @@ class TestHttpd(_TestBase): if fd.readline() == '\r\n': break response = fd.read(8192) - self.assert_(response == 'oh hai', 'invalid response %s' % response) + assert response == 'oh hai', 'invalid response %s' % response def test_015_write(self): self.site.application = use_write @@ -514,15 +514,15 @@ class TestHttpd(_TestBase): fd.write('GET /a HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') fd.flush() result1 = read_http(sock) - self.assert_('content-length' in result1.headers_lower) + assert 'content-length' in result1.headers_lower sock = eventlet.connect(('localhost', self.port)) fd = sock.makefile('w') fd.write('GET /b HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') fd.flush() result2 = read_http(sock) - self.assert_('transfer-encoding' in result2.headers_lower) - self.assert_(result2.headers_lower['transfer-encoding'] == 'chunked') + assert 'transfer-encoding' in result2.headers_lower + assert result2.headers_lower['transfer-encoding'] == 'chunked' def test_016_repeated_content_length(self): """ @@ -580,7 +580,7 @@ class TestHttpd(_TestBase): client.close() success = server_coro.wait() - self.assert_(success) + assert success def test_018_http_10_keepalive(self): # verify that if an http/1.0 client sends connection: keep-alive @@ -593,13 +593,13 @@ class TestHttpd(_TestBase): fd.flush() result1 = read_http(sock) - self.assert_('connection' in result1.headers_lower) + assert 'connection' in result1.headers_lower self.assertEqual('keep-alive', result1.headers_lower['connection']) # repeat request to verify connection is actually still open fd.write('GET / HTTP/1.0\r\nHost: localhost\r\nConnection: keep-alive\r\n\r\n') fd.flush() result2 = read_http(sock) - self.assert_('connection' in result2.headers_lower) + assert 'connection' in result2.headers_lower self.assertEqual('keep-alive', result2.headers_lower['connection']) sock.close() @@ -621,7 +621,7 @@ class TestHttpd(_TestBase): '2\r\noh\r\n' '4\r\n hai\r\n0\r\n\r\n') fd.flush() - self.assert_('hello!' in fd.read()) + assert 'hello!' in fd.read() def test_020_x_forwarded_for(self): request_bytes = ( @@ -633,7 +633,7 @@ class TestHttpd(_TestBase): sock.sendall(request_bytes) sock.recv(1024) sock.close() - self.assert_('1.2.3.4,5.6.7.8,127.0.0.1' in self.logfile.getvalue()) + assert '1.2.3.4,5.6.7.8,127.0.0.1' in self.logfile.getvalue() # turning off the option should work too self.logfile = six.StringIO() @@ -643,9 +643,9 @@ class TestHttpd(_TestBase): sock.sendall(request_bytes) sock.recv(1024) sock.close() - self.assert_('1.2.3.4' not in self.logfile.getvalue()) - self.assert_('5.6.7.8' not in self.logfile.getvalue()) - self.assert_('127.0.0.1' in self.logfile.getvalue()) + assert '1.2.3.4' not in self.logfile.getvalue() + assert '5.6.7.8' not in self.logfile.getvalue() + assert '127.0.0.1' in self.logfile.getvalue() def test_socket_remains_open(self): greenthread.kill(self.killer) @@ -659,8 +659,8 @@ class TestHttpd(_TestBase): fd.flush() result = fd.read(1024) fd.close() - self.assert_(result.startswith('HTTP'), result) - self.assert_(result.endswith('hello world')) + assert result.startswith('HTTP'), result + assert result.endswith('hello world') # shut down the server and verify the server_socket fd is still open, # but the actual socketobject passed in to wsgi.server is closed @@ -678,8 +678,8 @@ class TestHttpd(_TestBase): fd.flush() result = fd.read(1024) fd.close() - self.assert_(result.startswith('HTTP'), result) - self.assert_(result.endswith('hello world')) + assert result.startswith('HTTP'), result + assert result.endswith('hello world') def test_021_environ_clobbering(self): def clobberin_time(environ, start_response): @@ -701,7 +701,7 @@ class TestHttpd(_TestBase): 'Connection: close\r\n' '\r\n\r\n') fd.flush() - self.assert_('200 OK' in fd.read()) + assert '200 OK' in fd.read() def test_022_custom_pool(self): # just test that it accepts the parameter for now @@ -718,8 +718,8 @@ class TestHttpd(_TestBase): fd.flush() result = fd.read() fd.close() - self.assert_(result.startswith('HTTP'), result) - self.assert_(result.endswith('hello world')) + assert result.startswith('HTTP'), result + assert result.endswith('hello world') def test_023_bad_content_length(self): sock = eventlet.connect( @@ -729,9 +729,9 @@ class TestHttpd(_TestBase): fd.flush() result = fd.read() fd.close() - self.assert_(result.startswith('HTTP'), result) - self.assert_('400 Bad Request' in result) - self.assert_('500' not in result) + assert result.startswith('HTTP'), result + assert '400 Bad Request' in result + assert '500' not in result def test_024_expect_100_continue(self): def wsgi_app(environ, start_response): @@ -759,7 +759,7 @@ class TestHttpd(_TestBase): break else: header_lines.append(line) - self.assert_(header_lines[0].startswith('HTTP/1.1 100 Continue')) + assert header_lines[0].startswith('HTTP/1.1 100 Continue') header_lines = [] while True: line = fd.readline() @@ -767,7 +767,7 @@ class TestHttpd(_TestBase): break else: header_lines.append(line) - self.assert_(header_lines[0].startswith('HTTP/1.1 200 OK')) + assert header_lines[0].startswith('HTTP/1.1 200 OK') self.assertEqual(fd.read(7), 'testing') fd.close() sock.close() @@ -789,8 +789,8 @@ class TestHttpd(_TestBase): except socket.error as exc: self.assertEqual(get_errno(exc), errno.ECONNREFUSED) - self.assert_('Invalid argument' in self.logfile.getvalue(), - self.logfile.getvalue()) + log_content = self.logfile.getvalue() + assert 'Invalid argument' in log_content, log_content finally: sys.stderr = old_stderr debug.hub_exceptions(False) @@ -801,7 +801,7 @@ class TestHttpd(_TestBase): sock.sendall('GET /yo! HTTP/1.1\r\nHost: localhost\r\n\r\n') sock.recv(1024) sock.close() - self.assert_('\nHI GET /yo! HTTP/1.1 HI\n' in self.logfile.getvalue(), self.logfile.getvalue()) + assert '\nHI GET /yo! HTTP/1.1 HI\n' in self.logfile.getvalue(), self.logfile.getvalue() def test_close_chunked_with_1_0_client(self): # verify that if we return a generator from our app @@ -905,14 +905,14 @@ class TestHttpd(_TestBase): else: # close sock prematurely client.close() eventlet.sleep(0) # let context switch back to server - self.assert_(not errored[0], errored[0]) + assert not errored[0], errored[0] # make another request to ensure the server's still alive try: client = ssl.wrap_socket(eventlet.connect(('localhost', port))) client.write('GET / HTTP/1.0\r\nHost: localhost\r\n\r\n') result = client.read() - self.assert_(result.startswith('HTTP'), result) - self.assert_(result.endswith('hello world')) + assert result.startswith('HTTP'), result + assert result.endswith('hello world') except ImportError: pass # TODO: should test with OpenSSL greenthread.kill(g) @@ -1042,7 +1042,7 @@ class TestHttpd(_TestBase): headers.append(h) if h == '': break - self.assert_('Transfer-Encoding: chunked' in ''.join(headers)) + assert 'Transfer-Encoding: chunked' in ''.join(headers) # should only be one chunk of zero size with two blank lines # (one terminates the chunk, one terminates the body) self.assertEqual(response, ['0', '', '']) @@ -1091,7 +1091,7 @@ class TestHttpd(_TestBase): # the test passes if we successfully get here, and read all the data # in spite of the early close self.assertEqual(read_content.wait(), 'ok') - self.assert_(blew_up[0]) + assert blew_up[0] def test_exceptions_close_connection(self): def wsgi_app(environ, start_response): @@ -1104,7 +1104,7 @@ class TestHttpd(_TestBase): result = read_http(sock) self.assertEqual(result.status, 'HTTP/1.1 500 Internal Server Error') self.assertEqual(result.headers_lower['connection'], 'close') - self.assert_('transfer-encoding' not in result.headers_lower) + assert 'transfer-encoding' not in result.headers_lower def test_unicode_raises_error(self): def wsgi_app(environ, start_response): @@ -1119,7 +1119,7 @@ class TestHttpd(_TestBase): result = read_http(sock) self.assertEqual(result.status, 'HTTP/1.1 500 Internal Server Error') self.assertEqual(result.headers_lower['connection'], 'close') - self.assert_('unicode' in result.body) + assert 'unicode' in result.body def test_path_info_decoding(self): def wsgi_app(environ, start_response): @@ -1133,8 +1133,8 @@ class TestHttpd(_TestBase): fd.flush() result = read_http(sock) self.assertEqual(result.status, 'HTTP/1.1 200 OK') - self.assert_('decoded: /a*b@@#3' in result.body) - self.assert_('raw: /a*b@%40%233' in result.body) + assert 'decoded: /a*b@@#3' in result.body + assert 'raw: /a*b@%40%233' in result.body def test_ipv6(self): try: @@ -1174,7 +1174,7 @@ class TestHttpd(_TestBase): self.assertEqual(result1.status, 'HTTP/1.1 500 Internal Server Error') self.assertEqual(result1.body, '') self.assertEqual(result1.headers_lower['connection'], 'close') - self.assert_('transfer-encoding' not in result1.headers_lower) + assert 'transfer-encoding' not in result1.headers_lower # verify traceback when debugging enabled self.spawn_server(debug=True) @@ -1185,11 +1185,11 @@ class TestHttpd(_TestBase): fd.flush() result2 = read_http(sock) self.assertEqual(result2.status, 'HTTP/1.1 500 Internal Server Error') - self.assert_('intentional crash' in result2.body) - self.assert_('RuntimeError' in result2.body) - self.assert_('Traceback' in result2.body) + assert 'intentional crash' in result2.body + assert 'RuntimeError' in result2.body + assert 'Traceback' in result2.body self.assertEqual(result2.headers_lower['connection'], 'close') - self.assert_('transfer-encoding' not in result2.headers_lower) + assert 'transfer-encoding' not in result2.headers_lower def test_client_disconnect(self): """Issue #95 Server must handle disconnect from client in the middle of response @@ -1315,7 +1315,7 @@ class IterableAlreadyHandledTest(_TestBase): fd.flush() response_line, headers = read_headers(sock) self.assertEqual(response_line, 'HTTP/1.1 200 OK\r\n') - self.assert_('connection' not in headers) + assert 'connection' not in headers fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') fd.flush() result = read_http(sock) @@ -1473,7 +1473,7 @@ class TestChunkedInput(_TestBase): except eventlet.Timeout: pass else: - self.assert_(False) + assert False self.yield_next_space = True with eventlet.Timeout(.1): @@ -1496,7 +1496,7 @@ class TestChunkedInput(_TestBase): except eventlet.Timeout: pass else: - self.assert_(False) + assert False def test_close_before_finished(self): got_signal = []