diff --git a/tests/__init__.py b/tests/__init__.py
index 2234f0d..06596f7 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -185,18 +185,14 @@ class LimitedTestCase(unittest.TestCase):
verify_hub_empty()
def assert_less_than(self, a, b, msg=None):
- if msg:
- self.assert_(a < b, msg)
- else:
- self.assert_(a < b, "%s not less than %s" % (a, b))
+ msg = msg or "%s not less than %s" % (a, b)
+ assert a < b, msg
assertLessThan = assert_less_than
def assert_less_than_equal(self, a, b, msg=None):
- if msg:
- self.assert_(a <= b, msg)
- else:
- self.assert_(a <= b, "%s not less than or equal to %s" % (a, b))
+ msg = msg or "%s not less than or equal to %s" % (a, b)
+ assert a <= b, msg
assertLessThanEqual = assert_less_than_equal
diff --git a/tests/backdoor_test.py b/tests/backdoor_test.py
index 5df6d35..f304d3f 100644
--- a/tests/backdoor_test.py
+++ b/tests/backdoor_test.py
@@ -14,10 +14,10 @@ class BackdoorTest(LimitedTestCase):
client = socket.socket()
client.connect(('localhost', listener.getsockname()[1]))
f = client.makefile('rw')
- self.assert_(b'Python' in f.readline())
+ assert b'Python' in f.readline()
f.readline() # build info
f.readline() # help info
- self.assert_(b'InteractiveConsole' in f.readline())
+ assert b'InteractiveConsole' in f.readline()
self.assertEqual(b'>>> ', f.read(4))
f.write(b'print("hi")\n')
f.flush()
diff --git a/tests/db_pool_test.py b/tests/db_pool_test.py
index 4bdccc7..ce941ab 100644
--- a/tests/db_pool_test.py
+++ b/tests/db_pool_test.py
@@ -74,10 +74,10 @@ class DBConnectionPool(DBTester):
def assert_cursor_works(self, cursor):
cursor.execute("select 1")
rows = cursor.fetchall()
- self.assert_(rows)
+ assert rows
def test_connecting(self):
- self.assert_(self.connection is not None)
+ assert self.connection is not None
def test_create_cursor(self):
cursor = self.connection.cursor()
@@ -92,7 +92,7 @@ class DBConnectionPool(DBTester):
cursor = self.connection.cursor()
try:
cursor.execute("garbage blah blah")
- self.assert_(False)
+ assert False
except AssertionError:
raise
except Exception:
@@ -101,39 +101,39 @@ class DBConnectionPool(DBTester):
def test_put_none(self):
# the pool is of size 1, and its only connection is out
- self.assert_(self.pool.free() == 0)
+ assert self.pool.free() == 0
self.pool.put(None)
# ha ha we fooled it into thinking that we had a dead process
- self.assert_(self.pool.free() == 1)
+ assert self.pool.free() == 1
conn2 = self.pool.get()
- self.assert_(conn2 is not None)
- self.assert_(conn2.cursor)
+ assert conn2 is not None
+ assert conn2.cursor
self.pool.put(conn2)
def test_close_does_a_put(self):
- self.assert_(self.pool.free() == 0)
+ assert self.pool.free() == 0
self.connection.close()
- self.assert_(self.pool.free() == 1)
+ assert self.pool.free() == 1
self.assertRaises(AttributeError, self.connection.cursor)
@skipped
def test_deletion_does_a_put(self):
# doing a put on del causes some issues if __del__ is called in the
# main coroutine, so, not doing that for now
- self.assert_(self.pool.free() == 0)
+ assert self.pool.free() == 0
self.connection = None
- self.assert_(self.pool.free() == 1)
+ assert self.pool.free() == 1
def test_put_doesnt_double_wrap(self):
self.pool.put(self.connection)
conn = self.pool.get()
- self.assert_(not isinstance(conn._base, db_pool.PooledConnectionWrapper))
+ assert not isinstance(conn._base, db_pool.PooledConnectionWrapper)
self.pool.put(conn)
def test_bool(self):
- self.assert_(self.connection)
+ assert self.connection
self.connection.close()
- self.assert_(not self.connection)
+ assert not self.connection
def fill_up_table(self, conn):
curs = conn.cursor()
@@ -268,7 +268,7 @@ class DBConnectionPool(DBTester):
self.assert_(isinstance(self.connection,
db_pool.GenericConnectionWrapper))
conn = self.pool._unwrap_connection(self.connection)
- self.assert_(not isinstance(conn, db_pool.GenericConnectionWrapper))
+ assert not isinstance(conn, db_pool.GenericConnectionWrapper)
self.assertEqual(None, self.pool._unwrap_connection(None))
self.assertEqual(None, self.pool._unwrap_connection(1))
diff --git a/tests/debug_test.py b/tests/debug_test.py
index da6d7ab..67617f4 100644
--- a/tests/debug_test.py
+++ b/tests/debug_test.py
@@ -22,12 +22,12 @@ class TestSpew(TestCase):
def test_spew(self):
debug.spew()
- self.failUnless(isinstance(self.tracer, debug.Spew))
+ assert isinstance(self.tracer, debug.Spew)
def test_unspew(self):
debug.spew()
debug.unspew()
- self.failUnlessEqual(self.tracer, None)
+ assert self.tracer is None
def test_line(self):
sys.stdout = six.StringIO()
@@ -36,8 +36,8 @@ class TestSpew(TestCase):
s(f, "line", None)
lineno = f.f_lineno - 1 # -1 here since we called with frame f in the line above
output = sys.stdout.getvalue()
- self.failUnless("%s:%i" % (__name__, lineno) in output, "Didn't find line %i in %s" % (lineno, output))
- self.failUnless("f= 0)
- self.assert_(len(results2) > 0)
+ assert len(results1) > 0
+ assert len(results2) > 0
debug.hub_prevent_multiple_readers()
@skipped # by rdw because it fails but it's not clear how to make it pass
diff --git a/tests/greenpool_test.py b/tests/greenpool_test.py
index 90b2bfa..113b195 100644
--- a/tests/greenpool_test.py
+++ b/tests/greenpool_test.py
@@ -420,7 +420,7 @@ class Stress(tests.LimitedTestCase):
if received % 5 == 0:
eventlet.sleep(0.0001)
unique, order = i
- self.assert_(latest[unique] < order)
+ assert latest[unique] < order
latest[unique] = order
for l in latest[1:]:
self.assertEqual(l, iters - 1)
@@ -449,14 +449,14 @@ class Stress(tests.LimitedTestCase):
if latest == -1:
gc.collect()
initial_obj_count = len(gc.get_objects())
- self.assert_(i > latest)
+ assert i > latest
latest = i
if latest % 5 == 0:
eventlet.sleep(0.001)
if latest % 10 == 0:
gc.collect()
objs_created = len(gc.get_objects()) - initial_obj_count
- self.assert_(objs_created < 25 * concurrency, objs_created)
+ assert objs_created < 25 * concurrency, objs_created
# make sure we got to the end
self.assertEqual(latest, count - 1)
diff --git a/tests/greenthread_test.py b/tests/greenthread_test.py
index 9f0be11..0f7ba7b 100644
--- a/tests/greenthread_test.py
+++ b/tests/greenthread_test.py
@@ -15,28 +15,28 @@ def waiter(a):
class Asserts(object):
def assert_dead(self, gt):
if hasattr(gt, 'wait'):
- self.assertRaises(greenlet.GreenletExit, gt.wait)
- self.assert_(gt.dead)
- self.assert_(not gt)
+ self.assertRaises(greenlet.GreenletExit, gt.wait)
+ assert gt.dead
+ assert not gt
class Spawn(LimitedTestCase, Asserts):
def tearDown(self):
global _g_results
super(Spawn, self).tearDown()
_g_results = []
-
+
def test_simple(self):
gt = greenthread.spawn(passthru, 1, b=2)
self.assertEqual(gt.wait(), ((1,),{'b':2}))
self.assertEqual(_g_results, [((1,),{'b':2})])
-
+
def test_n(self):
gt = greenthread.spawn_n(passthru, 2, b=3)
- self.assert_(not gt.dead)
+ assert not gt.dead
greenthread.sleep(0)
- self.assert_(gt.dead)
+ assert gt.dead
self.assertEqual(_g_results, [((2,),{'b':3})])
-
+
def test_kill(self):
gt = greenthread.spawn(passthru, 6)
greenthread.kill(gt)
@@ -45,7 +45,7 @@ class Spawn(LimitedTestCase, Asserts):
self.assertEqual(_g_results, [])
greenthread.kill(gt)
self.assert_dead(gt)
-
+
def test_kill_meth(self):
gt = greenthread.spawn(passthru, 6)
gt.kill()
@@ -54,7 +54,7 @@ class Spawn(LimitedTestCase, Asserts):
self.assertEqual(_g_results, [])
gt.kill()
self.assert_dead(gt)
-
+
def test_kill_n(self):
gt = greenthread.spawn_n(passthru, 7)
greenthread.kill(gt)
@@ -63,7 +63,7 @@ class Spawn(LimitedTestCase, Asserts):
self.assertEqual(_g_results, [])
greenthread.kill(gt)
self.assert_dead(gt)
-
+
def test_link(self):
results = []
def link_func(g, *a, **kw):
@@ -74,7 +74,7 @@ class Spawn(LimitedTestCase, Asserts):
gt.link(link_func, 4, b=5)
self.assertEqual(gt.wait(), ((5,), {}))
self.assertEqual(results, [gt, (4,), {'b':5}])
-
+
def test_link_after_exited(self):
results = []
def link_func(g, *a, **kw):
@@ -105,7 +105,7 @@ class SpawnAfter(Spawn):
def test_basic(self):
gt = greenthread.spawn_after(0.1, passthru, 20)
self.assertEqual(gt.wait(), ((20,), {}))
-
+
def test_cancel(self):
gt = greenthread.spawn_after(0.1, passthru, 21)
gt.cancel()
@@ -116,7 +116,7 @@ class SpawnAfter(Spawn):
greenthread.sleep(0)
gt.cancel()
self.assertEqual(gt.wait(), 22)
-
+
def test_kill_already_started(self):
gt = greenthread.spawn_after(0, waiter, 22)
greenthread.sleep(0)
diff --git a/tests/hub_test.py b/tests/hub_test.py
index b7f307d..8110d63 100644
--- a/tests/hub_test.py
+++ b/tests/hub_test.py
@@ -215,7 +215,7 @@ class TestHubSelection(LimitedTestCase):
oldhub = hubs.get_hub()
try:
hubs.use_hub(Foo)
- self.assert_(isinstance(hubs.get_hub(), Foo), hubs.get_hub())
+ assert isinstance(hubs.get_hub(), Foo), hubs.get_hub()
finally:
hubs._threadlocal.hub = oldhub
@@ -282,7 +282,7 @@ except eventlet.Timeout:
os.kill(p.pid, signal.SIGCONT)
output, _ = p.communicate()
lines = output.decode('utf-8', 'replace').splitlines()
- self.assert_("exited correctly" in lines[-1], output)
+ assert "exited correctly" in lines[-1], output
shutil.rmtree(self.tempdir)
diff --git a/tests/mysqldb_test.py b/tests/mysqldb_test.py
index c4a77b0..82245d9 100644
--- a/tests/mysqldb_test.py
+++ b/tests/mysqldb_test.py
@@ -125,7 +125,7 @@ class TestMySQLdb(LimitedTestCase):
self.assertEqual(len(rows), 1)
self.assertEqual(len(rows[0]), 1)
self.assertEqual(rows[0][0], 1)
- self.assert_(counter[0] > 0, counter[0])
+ assert counter[0] > 0, counter[0]
gt.kill()
def assert_cursor_works(self, cursor):
@@ -145,10 +145,10 @@ class TestMySQLdb(LimitedTestCase):
for key in dir(orig):
if key not in ('__author__', '__path__', '__revision__',
'__version__', '__loader__'):
- self.assert_(hasattr(MySQLdb, key), "%s %s" % (key, getattr(orig, key)))
+ assert hasattr(MySQLdb, key), "%s %s" % (key, getattr(orig, key))
def test_connecting(self):
- self.assert_(self.connection is not None)
+ assert self.connection is not None
def test_connecting_annoyingly(self):
self.assert_connection_works(MySQLdb.Connect(**self._auth))
@@ -168,7 +168,7 @@ class TestMySQLdb(LimitedTestCase):
cursor = self.connection.cursor()
try:
cursor.execute("garbage blah blah")
- self.assert_(False)
+ assert False
except AssertionError:
raise
except Exception:
diff --git a/tests/patcher_psycopg_test.py b/tests/patcher_psycopg_test.py
index 3c837da..d4b4921 100644
--- a/tests/patcher_psycopg_test.py
+++ b/tests/patcher_psycopg_test.py
@@ -53,4 +53,4 @@ class PatchingPsycopg(patcher_test.ProcessBase):
print("Can't test psycopg2 patching; it's not installed.")
return
# if there's anything wrong with the test program it'll have a stack trace
- self.assert_(lines[0].startswith('done'), output)
+ assert lines[0].startswith('done'), output
diff --git a/tests/patcher_test.py b/tests/patcher_test.py
index f6e4ff7..7d12fd2 100644
--- a/tests/patcher_test.py
+++ b/tests/patcher_test.py
@@ -68,14 +68,14 @@ class ImportPatched(ProcessBase):
self.write_to_tempfile("patching", patching_module_contents)
self.write_to_tempfile("importing", import_module_contents)
output, lines = self.launch_subprocess('importing.py')
- self.assert_(lines[0].startswith('patcher'), repr(output))
- self.assert_(lines[1].startswith('base'), repr(output))
- self.assert_(lines[2].startswith('importing'), repr(output))
- self.assert_('eventlet.green.socket' in lines[1], repr(output))
- self.assert_('eventlet.green.urllib' in lines[1], repr(output))
- self.assert_('eventlet.green.socket' in lines[2], repr(output))
- self.assert_('eventlet.green.urllib' in lines[2], repr(output))
- self.assert_('eventlet.green.httplib' not in lines[2], repr(output))
+ assert lines[0].startswith('patcher'), repr(output)
+ assert lines[1].startswith('base'), repr(output)
+ assert lines[2].startswith('importing'), repr(output)
+ assert 'eventlet.green.socket' in lines[1], repr(output)
+ assert 'eventlet.green.urllib' in lines[1], repr(output)
+ assert 'eventlet.green.socket' in lines[2], repr(output)
+ assert 'eventlet.green.urllib' in lines[2], repr(output)
+ assert 'eventlet.green.httplib' not in lines[2], repr(output)
def test_import_patched_defaults(self):
self.write_to_tempfile("base", base_module_contents)
@@ -86,10 +86,10 @@ print("newmod {0} {1} {2}".format(base, base.socket, base.urllib.socket.socket))
"""
self.write_to_tempfile("newmod", new_mod)
output, lines = self.launch_subprocess('newmod.py')
- self.assert_(lines[0].startswith('base'), repr(output))
- self.assert_(lines[1].startswith('newmod'), repr(output))
- self.assert_('eventlet.green.socket' in lines[1], repr(output))
- self.assert_('GreenSocket' in lines[1], repr(output))
+ assert lines[0].startswith('base'), repr(output)
+ assert lines[1].startswith('newmod'), repr(output)
+ assert 'eventlet.green.socket' in lines[1], repr(output)
+ assert 'GreenSocket' in lines[1], repr(output)
class MonkeyPatch(ProcessBase):
@@ -103,7 +103,7 @@ print("newmod {0} {1}".format(socket.socket, urllib.socket.socket))
"""
self.write_to_tempfile("newmod", new_mod)
output, lines = self.launch_subprocess('newmod.py')
- self.assert_(lines[0].startswith('newmod'), repr(output))
+ assert lines[0].startswith('newmod'), repr(output)
self.assertEqual(lines[0].count('GreenSocket'), 2, repr(output))
def test_early_patching(self):
@@ -117,7 +117,7 @@ print("newmod")
self.write_to_tempfile("newmod", new_mod)
output, lines = self.launch_subprocess('newmod.py')
self.assertEqual(len(lines), 2, repr(output))
- self.assert_(lines[0].startswith('newmod'), repr(output))
+ assert lines[0].startswith('newmod'), repr(output)
def test_late_patching(self):
new_mod = """
@@ -131,7 +131,7 @@ print("newmod")
self.write_to_tempfile("newmod", new_mod)
output, lines = self.launch_subprocess('newmod.py')
self.assertEqual(len(lines), 2, repr(output))
- self.assert_(lines[0].startswith('newmod'), repr(output))
+ assert lines[0].startswith('newmod'), repr(output)
def test_typeerror(self):
new_mod = """
@@ -140,8 +140,8 @@ patcher.monkey_patch(finagle=True)
"""
self.write_to_tempfile("newmod", new_mod)
output, lines = self.launch_subprocess('newmod.py')
- self.assert_(lines[-2].startswith('TypeError'), repr(output))
- self.assert_('finagle' in lines[-2], repr(output))
+ assert lines[-2].startswith('TypeError'), repr(output)
+ assert 'finagle' in lines[-2], repr(output)
def assert_boolean_logic(self, call, expected, not_expected=''):
expected_list = ", ".join(['"%s"' % x for x in expected.split(',') if len(x)])
@@ -158,7 +158,7 @@ print("already_patched {0}".format(",".join(sorted(patcher.already_patched.keys(
self.write_to_tempfile("newmod", new_mod)
output, lines = self.launch_subprocess('newmod.py')
ap = 'already_patched'
- self.assert_(lines[0].startswith(ap), repr(output))
+ assert lines[0].startswith(ap), repr(output)
patched_modules = lines[0][len(ap):].strip()
# psycopg might or might not be patched based on installed modules
patched_modules = patched_modules.replace("psycopg,", "")
@@ -245,9 +245,9 @@ tpool.killall()
self.write_to_tempfile("newmod", new_mod)
output, lines = self.launch_subprocess('newmod.py')
self.assertEqual(len(lines), 3, output)
- self.assert_(lines[0].startswith('newmod'), repr(output))
- self.assert_('2' in lines[0], repr(output))
- self.assert_('3' in lines[1], repr(output))
+ assert lines[0].startswith('newmod'), repr(output)
+ assert '2' in lines[0], repr(output)
+ assert '3' in lines[1], repr(output)
@skip_with_pyevent
def test_unpatched_thread(self):
@@ -308,7 +308,7 @@ print(len(_threading._active))
self.write_to_tempfile("newmod", new_mod)
output, lines = self.launch_subprocess('newmod')
self.assertEqual(len(lines), 4, "\n".join(lines))
- self.assert_(lines[0].startswith('= previtem)
+ assert item >= previtem
# make sure the tick happened at least a few times so that we know
# that our iterations in foo() were actually tpooled
- self.assert_(counter[0] > 10, counter[0])
+ assert counter[0] > 10, counter[0]
gt.kill()
@skip_with_pyevent
@@ -231,31 +231,31 @@ class TestTpool(LimitedTestCase):
@skip_with_pyevent
def test_autowrap(self):
x = tpool.Proxy({'a': 1, 'b': 2}, autowrap=(int,))
- self.assert_(isinstance(x.get('a'), tpool.Proxy))
- self.assert_(not isinstance(x.items(), tpool.Proxy))
+ assert isinstance(x.get('a'), tpool.Proxy)
+ assert not isinstance(x.items(), tpool.Proxy)
# attributes as well as callables
from tests import tpool_test
x = tpool.Proxy(tpool_test, autowrap=(int,))
- self.assert_(isinstance(x.one, tpool.Proxy))
- self.assert_(not isinstance(x.none, tpool.Proxy))
+ assert isinstance(x.one, tpool.Proxy)
+ assert not isinstance(x.none, tpool.Proxy)
@skip_with_pyevent
def test_autowrap_names(self):
x = tpool.Proxy({'a': 1, 'b': 2}, autowrap_names=('get',))
- self.assert_(isinstance(x.get('a'), tpool.Proxy))
- self.assert_(not isinstance(x.items(), tpool.Proxy))
+ assert isinstance(x.get('a'), tpool.Proxy)
+ assert not isinstance(x.items(), tpool.Proxy)
from tests import tpool_test
x = tpool.Proxy(tpool_test, autowrap_names=('one',))
- self.assert_(isinstance(x.one, tpool.Proxy))
- self.assert_(not isinstance(x.two, tpool.Proxy))
+ assert isinstance(x.one, tpool.Proxy)
+ assert not isinstance(x.two, tpool.Proxy)
@skip_with_pyevent
def test_autowrap_both(self):
from tests import tpool_test
x = tpool.Proxy(tpool_test, autowrap=(int,), autowrap_names=('one',))
- self.assert_(isinstance(x.one, tpool.Proxy))
+ assert isinstance(x.one, tpool.Proxy)
# violating the abstraction to check that we didn't double-wrap
- self.assert_(not isinstance(x._obj, tpool.Proxy))
+ assert not isinstance(x._obj, tpool.Proxy)
@skip_with_pyevent
def test_callable(self):
@@ -265,7 +265,7 @@ class TestTpool(LimitedTestCase):
self.assertEqual(4, x(4))
# verify that it wraps return values if specified
x = tpool.Proxy(wrapped, autowrap_names=('__call__',))
- self.assert_(isinstance(x(4), tpool.Proxy))
+ assert isinstance(x(4), tpool.Proxy)
self.assertEqual("4", str(x(4)))
@skip_with_pyevent
diff --git a/tests/websocket_new_test.py b/tests/websocket_new_test.py
index 55569a4..284a867 100644
--- a/tests/websocket_new_test.py
+++ b/tests/websocket_new_test.py
@@ -159,7 +159,7 @@ class TestWebSocket(_TestBase):
resp = sock.recv(1024) # get the headers
sock.close() # close while the app is running
done_with_request.wait()
- self.assert_(not error_detected[0])
+ assert not error_detected[0]
def test_client_closing_connection_13(self):
error_detected = [False]
@@ -191,7 +191,7 @@ class TestWebSocket(_TestBase):
closeframe = struct.pack('!BBIH', 1 << 7 | 8, 1 << 7 | 2, 0, 1000)
sock.sendall(closeframe) # "Close the connection" packet.
done_with_request.wait()
- self.assert_(not error_detected[0])
+ assert not error_detected[0]
def test_client_invalid_packet_13(self):
error_detected = [False]
@@ -222,4 +222,4 @@ class TestWebSocket(_TestBase):
resp = sock.recv(1024) # get the headers
sock.sendall('\x07\xff') # Weird packet.
done_with_request.wait()
- self.assert_(not error_detected[0])
+ assert not error_detected[0]
diff --git a/tests/websocket_test.py b/tests/websocket_test.py
index 571f4c3..e2a2570 100644
--- a/tests/websocket_test.py
+++ b/tests/websocket_test.py
@@ -36,10 +36,10 @@ wsapp = WebSocketWSGI(handle)
class TestWebSocket(_TestBase):
TEST_TIMEOUT = 5
-
+
def set_site(self):
self.site = wsapp
-
+
def test_incorrect_headers(self):
def raiser():
try:
@@ -81,7 +81,7 @@ class TestWebSocket(_TestBase):
self.assertEqual(resp.status, 400)
self.assertEqual(resp.getheader('connection'), 'close')
self.assertEqual(resp.read(), '')
-
+
# Now, miss off key2
headers = dict(kv.split(': ') for kv in [
"Upgrade: WebSocket",
@@ -146,8 +146,8 @@ class TestWebSocket(_TestBase):
'Sec-WebSocket-Origin: http://localhost:%s' % self.port,
'Sec-WebSocket-Protocol: ws',
'Sec-WebSocket-Location: ws://localhost:%s/echo\r\n\r\n8jKS\'y:G*Co,Wxa-' % self.port]))
-
-
+
+
def test_query_string(self):
# verify that the query string comes out the other side unscathed
connect = [
@@ -197,7 +197,7 @@ class TestWebSocket(_TestBase):
'Sec-WebSocket-Origin: http://localhost:%s' % self.port,
'Sec-WebSocket-Protocol: ws',
'Sec-WebSocket-Location: ws://localhost:%s/echo?\r\n\r\n8jKS\'y:G*Co,Wxa-' % self.port]))
-
+
def test_sending_messages_to_websocket_75(self):
connect = [
@@ -330,7 +330,7 @@ class TestWebSocket(_TestBase):
resp = sock.recv(1024) # get the headers
sock.close() # close while the app is running
done_with_request.wait()
- self.assert_(not error_detected[0])
+ assert not error_detected[0]
def test_breaking_the_connection_76(self):
error_detected = [False]
@@ -363,8 +363,8 @@ class TestWebSocket(_TestBase):
resp = sock.recv(1024) # get the headers
sock.close() # close while the app is running
done_with_request.wait()
- self.assert_(not error_detected[0])
-
+ assert not error_detected[0]
+
def test_client_closing_connection_76(self):
error_detected = [False]
done_with_request = event.Event()
@@ -396,8 +396,8 @@ class TestWebSocket(_TestBase):
resp = sock.recv(1024) # get the headers
sock.sendall('\xff\x00') # "Close the connection" packet.
done_with_request.wait()
- self.assert_(not error_detected[0])
-
+ assert not error_detected[0]
+
def test_client_invalid_packet_76(self):
error_detected = [False]
done_with_request = event.Event()
@@ -429,8 +429,8 @@ class TestWebSocket(_TestBase):
resp = sock.recv(1024) # get the headers
sock.sendall('\xef\x00') # Weird packet.
done_with_request.wait()
- self.assert_(error_detected[0])
-
+ assert error_detected[0]
+
def test_server_closing_connect_76(self):
connect = [
"GET / HTTP/1.1",
@@ -479,7 +479,7 @@ class TestWebSocket(_TestBase):
sock.sendall('\r\n'.join(connect) + '\r\n\r\n')
resp = sock.recv(1024)
done_with_request.wait()
- self.assert_(error_detected[0])
+ assert error_detected[0]
def test_app_socket_errors_76(self):
error_detected = [False]
@@ -511,7 +511,7 @@ class TestWebSocket(_TestBase):
sock.sendall('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U')
resp = sock.recv(1024)
done_with_request.wait()
- self.assert_(error_detected[0])
+ assert error_detected[0]
class TestWebSocketSSL(_TestBase):
@@ -521,7 +521,7 @@ class TestWebSocketSSL(_TestBase):
@skip_if_no_ssl
def test_ssl_sending_messages(self):
s = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)),
- certfile=certificate_file,
+ certfile=certificate_file,
keyfile=private_key_file,
server_side=True)
self.spawn_server(sock=s)
@@ -541,9 +541,9 @@ class TestWebSocketSSL(_TestBase):
sock.sendall('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U')
first_resp = sock.recv(1024)
# make sure it sets the wss: protocol on the location header
- loc_line = [x for x in first_resp.split("\r\n")
+ loc_line = [x for x in first_resp.split("\r\n")
if x.lower().startswith('sec-websocket-location')][0]
- self.assert_("wss://localhost" in loc_line,
+ self.assert_("wss://localhost" in loc_line,
"Expecting wss protocol in location: %s" % loc_line)
sock.sendall('\x00hello\xFF')
result = sock.recv(1024)
@@ -584,11 +584,11 @@ class TestWebSocketObject(LimitedTestCase):
def test_send_to_ws(self):
ws = self.test_ws
ws.send(u'hello')
- self.assert_(ws.socket.sendall.called_with("\x00hello\xFF"))
+ assert ws.socket.sendall.called_with("\x00hello\xFF")
ws.send(10)
- self.assert_(ws.socket.sendall.called_with("\x0010\xFF"))
+ assert ws.socket.sendall.called_with("\x0010\xFF")
def test_close_ws(self):
ws = self.test_ws
ws.close()
- self.assert_(ws.socket.shutdown.called_with(True))
+ assert ws.socket.shutdown.called_with(True)
diff --git a/tests/wsgi_test.py b/tests/wsgi_test.py
index 5bc71b0..4a19368 100644
--- a/tests/wsgi_test.py
+++ b/tests/wsgi_test.py
@@ -263,8 +263,8 @@ class TestHttpd(_TestBase):
result = fd.read()
fd.close()
## The server responds with the maximum version it supports
- self.assert_(result.startswith('HTTP'), result)
- self.assert_(result.endswith('hello world'))
+ assert result.startswith('HTTP'), result
+ assert result.endswith('hello world')
def test_002_keepalive(self):
sock = eventlet.connect(
@@ -388,7 +388,7 @@ class TestHttpd(_TestBase):
fd = sock.makefile('rw')
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
fd.flush()
- self.assert_('Transfer-Encoding: chunked' in fd.read())
+ assert 'Transfer-Encoding: chunked' in fd.read()
def test_010_no_chunked_http_1_0(self):
self.site.application = chunked_app
@@ -398,7 +398,7 @@ class TestHttpd(_TestBase):
fd = sock.makefile('rw')
fd.write('GET / HTTP/1.0\r\nHost: localhost\r\nConnection: close\r\n\r\n')
fd.flush()
- self.assert_('Transfer-Encoding: chunked' not in fd.read())
+ assert 'Transfer-Encoding: chunked' not in fd.read()
def test_011_multiple_chunks(self):
self.site.application = big_chunks
@@ -415,7 +415,7 @@ class TestHttpd(_TestBase):
break
else:
headers += line
- self.assert_('Transfer-Encoding: chunked' in headers)
+ assert 'Transfer-Encoding: chunked' in headers
chunks = 0
chunklen = int(fd.readline(), 16)
while chunklen:
@@ -423,7 +423,7 @@ class TestHttpd(_TestBase):
fd.read(chunklen)
fd.readline() # CRLF
chunklen = int(fd.readline(), 16)
- self.assert_(chunks > 1)
+ assert chunks > 1
response = fd.read()
# Require a CRLF to close the message body
self.assertEqual(response, '\r\n')
@@ -481,7 +481,7 @@ class TestHttpd(_TestBase):
if fd.readline() == '\r\n':
break
response = fd.read()
- self.assert_(response == 'oh hai', 'invalid response %s' % response)
+ assert response == 'oh hai', 'invalid response %s' % response
sock = eventlet.connect(('localhost', self.port))
fd = sock.makefile('rw')
@@ -493,7 +493,7 @@ class TestHttpd(_TestBase):
if fd.readline() == '\r\n':
break
response = fd.read()
- self.assert_(response == 'oh hai', 'invalid response %s' % response)
+ assert response == 'oh hai', 'invalid response %s' % response
sock = eventlet.connect(('localhost', self.port))
fd = sock.makefile('rw')
@@ -505,7 +505,7 @@ class TestHttpd(_TestBase):
if fd.readline() == '\r\n':
break
response = fd.read(8192)
- self.assert_(response == 'oh hai', 'invalid response %s' % response)
+ assert response == 'oh hai', 'invalid response %s' % response
def test_015_write(self):
self.site.application = use_write
@@ -514,15 +514,15 @@ class TestHttpd(_TestBase):
fd.write('GET /a HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
fd.flush()
result1 = read_http(sock)
- self.assert_('content-length' in result1.headers_lower)
+ assert 'content-length' in result1.headers_lower
sock = eventlet.connect(('localhost', self.port))
fd = sock.makefile('w')
fd.write('GET /b HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
fd.flush()
result2 = read_http(sock)
- self.assert_('transfer-encoding' in result2.headers_lower)
- self.assert_(result2.headers_lower['transfer-encoding'] == 'chunked')
+ assert 'transfer-encoding' in result2.headers_lower
+ assert result2.headers_lower['transfer-encoding'] == 'chunked'
def test_016_repeated_content_length(self):
"""
@@ -580,7 +580,7 @@ class TestHttpd(_TestBase):
client.close()
success = server_coro.wait()
- self.assert_(success)
+ assert success
def test_018_http_10_keepalive(self):
# verify that if an http/1.0 client sends connection: keep-alive
@@ -593,13 +593,13 @@ class TestHttpd(_TestBase):
fd.flush()
result1 = read_http(sock)
- self.assert_('connection' in result1.headers_lower)
+ assert 'connection' in result1.headers_lower
self.assertEqual('keep-alive', result1.headers_lower['connection'])
# repeat request to verify connection is actually still open
fd.write('GET / HTTP/1.0\r\nHost: localhost\r\nConnection: keep-alive\r\n\r\n')
fd.flush()
result2 = read_http(sock)
- self.assert_('connection' in result2.headers_lower)
+ assert 'connection' in result2.headers_lower
self.assertEqual('keep-alive', result2.headers_lower['connection'])
sock.close()
@@ -621,7 +621,7 @@ class TestHttpd(_TestBase):
'2\r\noh\r\n'
'4\r\n hai\r\n0\r\n\r\n')
fd.flush()
- self.assert_('hello!' in fd.read())
+ assert 'hello!' in fd.read()
def test_020_x_forwarded_for(self):
request_bytes = (
@@ -633,7 +633,7 @@ class TestHttpd(_TestBase):
sock.sendall(request_bytes)
sock.recv(1024)
sock.close()
- self.assert_('1.2.3.4,5.6.7.8,127.0.0.1' in self.logfile.getvalue())
+ assert '1.2.3.4,5.6.7.8,127.0.0.1' in self.logfile.getvalue()
# turning off the option should work too
self.logfile = six.StringIO()
@@ -643,9 +643,9 @@ class TestHttpd(_TestBase):
sock.sendall(request_bytes)
sock.recv(1024)
sock.close()
- self.assert_('1.2.3.4' not in self.logfile.getvalue())
- self.assert_('5.6.7.8' not in self.logfile.getvalue())
- self.assert_('127.0.0.1' in self.logfile.getvalue())
+ assert '1.2.3.4' not in self.logfile.getvalue()
+ assert '5.6.7.8' not in self.logfile.getvalue()
+ assert '127.0.0.1' in self.logfile.getvalue()
def test_socket_remains_open(self):
greenthread.kill(self.killer)
@@ -659,8 +659,8 @@ class TestHttpd(_TestBase):
fd.flush()
result = fd.read(1024)
fd.close()
- self.assert_(result.startswith('HTTP'), result)
- self.assert_(result.endswith('hello world'))
+ assert result.startswith('HTTP'), result
+ assert result.endswith('hello world')
# shut down the server and verify the server_socket fd is still open,
# but the actual socketobject passed in to wsgi.server is closed
@@ -678,8 +678,8 @@ class TestHttpd(_TestBase):
fd.flush()
result = fd.read(1024)
fd.close()
- self.assert_(result.startswith('HTTP'), result)
- self.assert_(result.endswith('hello world'))
+ assert result.startswith('HTTP'), result
+ assert result.endswith('hello world')
def test_021_environ_clobbering(self):
def clobberin_time(environ, start_response):
@@ -701,7 +701,7 @@ class TestHttpd(_TestBase):
'Connection: close\r\n'
'\r\n\r\n')
fd.flush()
- self.assert_('200 OK' in fd.read())
+ assert '200 OK' in fd.read()
def test_022_custom_pool(self):
# just test that it accepts the parameter for now
@@ -718,8 +718,8 @@ class TestHttpd(_TestBase):
fd.flush()
result = fd.read()
fd.close()
- self.assert_(result.startswith('HTTP'), result)
- self.assert_(result.endswith('hello world'))
+ assert result.startswith('HTTP'), result
+ assert result.endswith('hello world')
def test_023_bad_content_length(self):
sock = eventlet.connect(
@@ -729,9 +729,9 @@ class TestHttpd(_TestBase):
fd.flush()
result = fd.read()
fd.close()
- self.assert_(result.startswith('HTTP'), result)
- self.assert_('400 Bad Request' in result)
- self.assert_('500' not in result)
+ assert result.startswith('HTTP'), result
+ assert '400 Bad Request' in result
+ assert '500' not in result
def test_024_expect_100_continue(self):
def wsgi_app(environ, start_response):
@@ -759,7 +759,7 @@ class TestHttpd(_TestBase):
break
else:
header_lines.append(line)
- self.assert_(header_lines[0].startswith('HTTP/1.1 100 Continue'))
+ assert header_lines[0].startswith('HTTP/1.1 100 Continue')
header_lines = []
while True:
line = fd.readline()
@@ -767,7 +767,7 @@ class TestHttpd(_TestBase):
break
else:
header_lines.append(line)
- self.assert_(header_lines[0].startswith('HTTP/1.1 200 OK'))
+ assert header_lines[0].startswith('HTTP/1.1 200 OK')
self.assertEqual(fd.read(7), 'testing')
fd.close()
sock.close()
@@ -789,8 +789,8 @@ class TestHttpd(_TestBase):
except socket.error as exc:
self.assertEqual(get_errno(exc), errno.ECONNREFUSED)
- self.assert_('Invalid argument' in self.logfile.getvalue(),
- self.logfile.getvalue())
+ log_content = self.logfile.getvalue()
+ assert 'Invalid argument' in log_content, log_content
finally:
sys.stderr = old_stderr
debug.hub_exceptions(False)
@@ -801,7 +801,7 @@ class TestHttpd(_TestBase):
sock.sendall('GET /yo! HTTP/1.1\r\nHost: localhost\r\n\r\n')
sock.recv(1024)
sock.close()
- self.assert_('\nHI GET /yo! HTTP/1.1 HI\n' in self.logfile.getvalue(), self.logfile.getvalue())
+ assert '\nHI GET /yo! HTTP/1.1 HI\n' in self.logfile.getvalue(), self.logfile.getvalue()
def test_close_chunked_with_1_0_client(self):
# verify that if we return a generator from our app
@@ -905,14 +905,14 @@ class TestHttpd(_TestBase):
else: # close sock prematurely
client.close()
eventlet.sleep(0) # let context switch back to server
- self.assert_(not errored[0], errored[0])
+ assert not errored[0], errored[0]
# make another request to ensure the server's still alive
try:
client = ssl.wrap_socket(eventlet.connect(('localhost', port)))
client.write('GET / HTTP/1.0\r\nHost: localhost\r\n\r\n')
result = client.read()
- self.assert_(result.startswith('HTTP'), result)
- self.assert_(result.endswith('hello world'))
+ assert result.startswith('HTTP'), result
+ assert result.endswith('hello world')
except ImportError:
pass # TODO: should test with OpenSSL
greenthread.kill(g)
@@ -1042,7 +1042,7 @@ class TestHttpd(_TestBase):
headers.append(h)
if h == '':
break
- self.assert_('Transfer-Encoding: chunked' in ''.join(headers))
+ assert 'Transfer-Encoding: chunked' in ''.join(headers)
# should only be one chunk of zero size with two blank lines
# (one terminates the chunk, one terminates the body)
self.assertEqual(response, ['0', '', ''])
@@ -1091,7 +1091,7 @@ class TestHttpd(_TestBase):
# the test passes if we successfully get here, and read all the data
# in spite of the early close
self.assertEqual(read_content.wait(), 'ok')
- self.assert_(blew_up[0])
+ assert blew_up[0]
def test_exceptions_close_connection(self):
def wsgi_app(environ, start_response):
@@ -1104,7 +1104,7 @@ class TestHttpd(_TestBase):
result = read_http(sock)
self.assertEqual(result.status, 'HTTP/1.1 500 Internal Server Error')
self.assertEqual(result.headers_lower['connection'], 'close')
- self.assert_('transfer-encoding' not in result.headers_lower)
+ assert 'transfer-encoding' not in result.headers_lower
def test_unicode_raises_error(self):
def wsgi_app(environ, start_response):
@@ -1119,7 +1119,7 @@ class TestHttpd(_TestBase):
result = read_http(sock)
self.assertEqual(result.status, 'HTTP/1.1 500 Internal Server Error')
self.assertEqual(result.headers_lower['connection'], 'close')
- self.assert_('unicode' in result.body)
+ assert 'unicode' in result.body
def test_path_info_decoding(self):
def wsgi_app(environ, start_response):
@@ -1133,8 +1133,8 @@ class TestHttpd(_TestBase):
fd.flush()
result = read_http(sock)
self.assertEqual(result.status, 'HTTP/1.1 200 OK')
- self.assert_('decoded: /a*b@@#3' in result.body)
- self.assert_('raw: /a*b@%40%233' in result.body)
+ assert 'decoded: /a*b@@#3' in result.body
+ assert 'raw: /a*b@%40%233' in result.body
def test_ipv6(self):
try:
@@ -1174,7 +1174,7 @@ class TestHttpd(_TestBase):
self.assertEqual(result1.status, 'HTTP/1.1 500 Internal Server Error')
self.assertEqual(result1.body, '')
self.assertEqual(result1.headers_lower['connection'], 'close')
- self.assert_('transfer-encoding' not in result1.headers_lower)
+ assert 'transfer-encoding' not in result1.headers_lower
# verify traceback when debugging enabled
self.spawn_server(debug=True)
@@ -1185,11 +1185,11 @@ class TestHttpd(_TestBase):
fd.flush()
result2 = read_http(sock)
self.assertEqual(result2.status, 'HTTP/1.1 500 Internal Server Error')
- self.assert_('intentional crash' in result2.body)
- self.assert_('RuntimeError' in result2.body)
- self.assert_('Traceback' in result2.body)
+ assert 'intentional crash' in result2.body
+ assert 'RuntimeError' in result2.body
+ assert 'Traceback' in result2.body
self.assertEqual(result2.headers_lower['connection'], 'close')
- self.assert_('transfer-encoding' not in result2.headers_lower)
+ assert 'transfer-encoding' not in result2.headers_lower
def test_client_disconnect(self):
"""Issue #95 Server must handle disconnect from client in the middle of response
@@ -1315,7 +1315,7 @@ class IterableAlreadyHandledTest(_TestBase):
fd.flush()
response_line, headers = read_headers(sock)
self.assertEqual(response_line, 'HTTP/1.1 200 OK\r\n')
- self.assert_('connection' not in headers)
+ assert 'connection' not in headers
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
fd.flush()
result = read_http(sock)
@@ -1473,7 +1473,7 @@ class TestChunkedInput(_TestBase):
except eventlet.Timeout:
pass
else:
- self.assert_(False)
+ assert False
self.yield_next_space = True
with eventlet.Timeout(.1):
@@ -1496,7 +1496,7 @@ class TestChunkedInput(_TestBase):
except eventlet.Timeout:
pass
else:
- self.assert_(False)
+ assert False
def test_close_before_finished(self):
got_signal = []