Tweaks to #182. Set default timeout, add test.

Add default timeout for PagedResult class
Add integration test for concurrent with PagedResult
Minor comment correction
This commit is contained in:
Adam Holmberg
2014-08-14 15:02:40 -05:00
parent 5300d231c9
commit b4318bf580
3 changed files with 29 additions and 4 deletions

View File

@@ -2893,7 +2893,7 @@ class PagedResult(object):
response_future = None
def __init__(self, response_future, initial_response, timeout):
def __init__(self, response_future, initial_response, timeout=_NOT_SET):
self.response_future = response_future
self.current_response = iter(initial_response)
self.timeout = timeout

View File

@@ -31,7 +31,7 @@ def execute_concurrent(session, statements_and_parameters, concurrency=100, rais
``parameters`` item must be a sequence or :const:`None`.
A sequence of ``(success, result_or_exc)`` tuples is returned in the same
order that the statements were passed in. If ``success`` if :const:`False`,
order that the statements were passed in. If ``success`` is :const:`False`,
there was an error executing the statement, and ``result_or_exc`` will be
an :class:`Exception`. If ``success`` is :const:`True`, ``result_or_exc``
will be the query result.

View File

@@ -17,12 +17,12 @@ from tests.integration import PROTOCOL_VERSION
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
import unittest # noqa
from itertools import cycle
from cassandra import InvalidRequest, ConsistencyLevel
from cassandra.cluster import Cluster
from cassandra.cluster import Cluster, PagedResult
from cassandra.concurrent import (execute_concurrent,
execute_concurrent_with_args)
from cassandra.policies import HostDistance
@@ -83,6 +83,31 @@ class ClusterTests(unittest.TestCase):
self.assertEqual(num_statements, len(results))
self.assertEqual([(True, [(i,)]) for i in range(num_statements)], results)
def test_execute_concurrent_paged_result(self):
num_statements = 201
statement = SimpleStatement(
"INSERT INTO test3rf.test (k, v) VALUES (%s, %s)",
consistency_level=ConsistencyLevel.QUORUM)
parameters = [(i, i) for i in range(num_statements)]
results = execute_concurrent_with_args(self.session, statement, parameters)
self.assertEqual(num_statements, len(results))
self.assertEqual([(True, None)] * num_statements, results)
# read
statement = SimpleStatement(
"SELECT * FROM test3rf.test LIMIT %s",
consistency_level=ConsistencyLevel.QUORUM,
fetch_size=int(num_statements/2))
parameters = [(i, ) for i in range(num_statements)]
results = execute_concurrent_with_args(self.session, statement, [(num_statements,)])
self.assertEqual(1, len(results))
self.assertTrue(results[0][0])
result = results[0][1]
self.assertIsInstance(result, PagedResult)
self.assertEqual(num_statements, sum(1 for _ in result))
def test_first_failure(self):
statements = cycle(("INSERT INTO test3rf.test (k, v) VALUES (%s, %s)", ))
parameters = [(i, i) for i in range(100)]