diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 71721ab9..de40876d 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -2757,7 +2757,7 @@ class ResponseFuture(object): if self._paging_state is None: return self._final_result else: - return PagedResult(self, self._final_result) + return PagedResult(self, self._final_result, timeout) elif self._final_exception: raise self._final_exception else: @@ -2766,7 +2766,7 @@ class ResponseFuture(object): if self._paging_state is None: return self._final_result else: - return PagedResult(self, self._final_result) + return PagedResult(self, self._final_result, timeout) elif self._final_exception: raise self._final_exception else: @@ -2920,9 +2920,10 @@ class PagedResult(object): response_future = None - def __init__(self, response_future, initial_response): + def __init__(self, response_future, initial_response, timeout=_NOT_SET): self.response_future = response_future self.current_response = iter(initial_response) + self.timeout = timeout def __iter__(self): return self @@ -2935,7 +2936,7 @@ class PagedResult(object): raise self.response_future.start_fetching_next_page() - result = self.response_future.result() + result = self.response_future.result(self.timeout) if self.response_future.has_more_pages: self.current_response = result.current_response else: diff --git a/cassandra/concurrent.py b/cassandra/concurrent.py index 0f646da8..4ddef1ba 100644 --- a/cassandra/concurrent.py +++ b/cassandra/concurrent.py @@ -31,7 +31,7 @@ def execute_concurrent(session, statements_and_parameters, concurrency=100, rais ``parameters`` item must be a sequence or :const:`None`. A sequence of ``(success, result_or_exc)`` tuples is returned in the same - order that the statements were passed in. If ``success`` if :const:`False`, + order that the statements were passed in. If ``success`` is :const:`False`, there was an error executing the statement, and ``result_or_exc`` will be an :class:`Exception`. If ``success`` is :const:`True`, ``result_or_exc`` will be the query result. diff --git a/tests/integration/standard/test_concurrent.py b/tests/integration/standard/test_concurrent.py index 35804db1..d8b7aac9 100644 --- a/tests/integration/standard/test_concurrent.py +++ b/tests/integration/standard/test_concurrent.py @@ -17,12 +17,12 @@ from tests.integration import PROTOCOL_VERSION try: import unittest2 as unittest except ImportError: - import unittest # noqa + import unittest # noqa from itertools import cycle from cassandra import InvalidRequest, ConsistencyLevel -from cassandra.cluster import Cluster +from cassandra.cluster import Cluster, PagedResult from cassandra.concurrent import (execute_concurrent, execute_concurrent_with_args) from cassandra.policies import HostDistance @@ -83,6 +83,31 @@ class ClusterTests(unittest.TestCase): self.assertEqual(num_statements, len(results)) self.assertEqual([(True, [(i,)]) for i in range(num_statements)], results) + def test_execute_concurrent_paged_result(self): + num_statements = 201 + statement = SimpleStatement( + "INSERT INTO test3rf.test (k, v) VALUES (%s, %s)", + consistency_level=ConsistencyLevel.QUORUM) + parameters = [(i, i) for i in range(num_statements)] + + results = execute_concurrent_with_args(self.session, statement, parameters) + self.assertEqual(num_statements, len(results)) + self.assertEqual([(True, None)] * num_statements, results) + + # read + statement = SimpleStatement( + "SELECT * FROM test3rf.test LIMIT %s", + consistency_level=ConsistencyLevel.QUORUM, + fetch_size=int(num_statements/2)) + parameters = [(i, ) for i in range(num_statements)] + + results = execute_concurrent_with_args(self.session, statement, [(num_statements,)]) + self.assertEqual(1, len(results)) + self.assertTrue(results[0][0]) + result = results[0][1] + self.assertIsInstance(result, PagedResult) + self.assertEqual(num_statements, sum(1 for _ in result)) + def test_first_failure(self): statements = cycle(("INSERT INTO test3rf.test (k, v) VALUES (%s, %s)", )) parameters = [(i, i) for i in range(100)]