Fixing timing issues in connection, and query trace tests
This commit is contained in:
@@ -227,10 +227,20 @@ class ConnectionTests(object):
|
|||||||
t.join()
|
t.join()
|
||||||
|
|
||||||
def test_connect_timeout(self):
|
def test_connect_timeout(self):
|
||||||
|
# Underlying socket implementations don't always throw a socket timeout even with min float
|
||||||
|
# This can be timing sensitive, added retry to ensure failure occurs if it can
|
||||||
|
max_retry_count = 10
|
||||||
|
exception_thrown = False
|
||||||
|
for i in range(max_retry_count):
|
||||||
start = time.time()
|
start = time.time()
|
||||||
self.assertRaises(Exception, self.get_connection, timeout=sys.float_info.min)
|
try:
|
||||||
|
self.get_connection(timeout=sys.float_info.min)
|
||||||
|
except Exception as e:
|
||||||
end = time.time()
|
end = time.time()
|
||||||
self.assertAlmostEqual(start, end, 1)
|
self.assertAlmostEqual(start, end, 1)
|
||||||
|
exception_thrown = True
|
||||||
|
break
|
||||||
|
self.assertTrue(exception_thrown)
|
||||||
|
|
||||||
|
|
||||||
class AsyncoreConnectionTests(ConnectionTests, unittest.TestCase):
|
class AsyncoreConnectionTests(ConnectionTests, unittest.TestCase):
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ from cassandra.policies import HostDistance
|
|||||||
|
|
||||||
from tests.integration import use_singledc, PROTOCOL_VERSION, BasicSharedKeyspaceUnitTestCase, get_server_versions, greaterthanprotocolv3
|
from tests.integration import use_singledc, PROTOCOL_VERSION, BasicSharedKeyspaceUnitTestCase, get_server_versions, greaterthanprotocolv3
|
||||||
|
|
||||||
|
import time
|
||||||
import re
|
import re
|
||||||
|
|
||||||
|
|
||||||
@@ -155,15 +156,18 @@ class QueryTests(BasicSharedKeyspaceUnitTestCase):
|
|||||||
# Create table and run insert, then select
|
# Create table and run insert, then select
|
||||||
self.session.execute("CREATE TABLE {0} (k INT, i INT, PRIMARY KEY(k, i))".format(self.keyspace_table_name))
|
self.session.execute("CREATE TABLE {0} (k INT, i INT, PRIMARY KEY(k, i))".format(self.keyspace_table_name))
|
||||||
self.session.execute("INSERT INTO {0} (k, i) VALUES (0, 1)".format(self.keyspace_table_name))
|
self.session.execute("INSERT INTO {0} (k, i) VALUES (0, 1)".format(self.keyspace_table_name))
|
||||||
|
|
||||||
response_future = self.session.execute_async("SELECT i FROM {0} WHERE k=0".format(self.keyspace_table_name), trace=True)
|
response_future = self.session.execute_async("SELECT i FROM {0} WHERE k=0".format(self.keyspace_table_name), trace=True)
|
||||||
response_future.result()
|
response_future.result()
|
||||||
|
|
||||||
self.assertEqual(len(response_future._query_traces), 1)
|
self.assertEqual(len(response_future._query_traces), 1)
|
||||||
trace = response_future._query_traces[0]
|
trace = response_future._query_traces[0]
|
||||||
|
self.assertTrue(self._wait_for_trace_to_populate(trace.trace_id))
|
||||||
|
|
||||||
# Delete trace duration from the session (this is what the driver polls for "complete")
|
# Delete trace duration from the session (this is what the driver polls for "complete")
|
||||||
delete_statement = SimpleStatement("DELETE duration FROM system_traces.sessions WHERE session_id = {}".format(trace.trace_id), consistency_level=ConsistencyLevel.ALL)
|
delete_statement = SimpleStatement("DELETE duration FROM system_traces.sessions WHERE session_id = {}".format(trace.trace_id), consistency_level=ConsistencyLevel.ALL)
|
||||||
self.session.execute(delete_statement)
|
self.session.execute(delete_statement)
|
||||||
|
self.assertTrue(self._wait_for_trace_to_delete(trace.trace_id))
|
||||||
|
|
||||||
# should raise because duration is not set
|
# should raise because duration is not set
|
||||||
self.assertRaises(TraceUnavailable, trace.populate, max_wait=0.2, wait_for_complete=True)
|
self.assertRaises(TraceUnavailable, trace.populate, max_wait=0.2, wait_for_complete=True)
|
||||||
@@ -178,6 +182,29 @@ class QueryTests(BasicSharedKeyspaceUnitTestCase):
|
|||||||
self.assertTrue(trace.events) # non-zero list len
|
self.assertTrue(trace.events) # non-zero list len
|
||||||
self.assertIsNotNone(trace.started_at)
|
self.assertIsNotNone(trace.started_at)
|
||||||
|
|
||||||
|
def _wait_for_trace_to_populate(self, trace_id):
|
||||||
|
count = 0
|
||||||
|
retry_max = 10
|
||||||
|
while(not self._is_trace_present(trace_id) and count < retry_max):
|
||||||
|
time.sleep(.2)
|
||||||
|
count += 1
|
||||||
|
return count != retry_max
|
||||||
|
|
||||||
|
def _wait_for_trace_to_delete(self, trace_id):
|
||||||
|
count = 0
|
||||||
|
retry_max = 10
|
||||||
|
while(self._is_trace_present(trace_id) and count < retry_max):
|
||||||
|
time.sleep(.2)
|
||||||
|
count += 1
|
||||||
|
return count != retry_max
|
||||||
|
|
||||||
|
def _is_trace_present(self, trace_id):
|
||||||
|
select_statement = SimpleStatement("SElECT duration FROM system_traces.sessions WHERE session_id = {}".format(trace_id), consistency_level=ConsistencyLevel.ALL)
|
||||||
|
ssrs = self.session.execute(select_statement)
|
||||||
|
if(ssrs[0].duration is None):
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
def test_column_names(self):
|
def test_column_names(self):
|
||||||
"""
|
"""
|
||||||
Test to validate the columns are present on the result set.
|
Test to validate the columns are present on the result set.
|
||||||
|
|||||||
Reference in New Issue
Block a user