Merge remote-tracking branch 'origin/paged-result-timeout'

This commit is contained in:
Adam Holmberg
2014-09-05 19:18:01 +00:00
3 changed files with 33 additions and 7 deletions

View File

@@ -2757,7 +2757,7 @@ class ResponseFuture(object):
if self._paging_state is None:
return self._final_result
else:
return PagedResult(self, self._final_result)
return PagedResult(self, self._final_result, timeout)
elif self._final_exception:
raise self._final_exception
else:
@@ -2766,7 +2766,7 @@ class ResponseFuture(object):
if self._paging_state is None:
return self._final_result
else:
return PagedResult(self, self._final_result)
return PagedResult(self, self._final_result, timeout)
elif self._final_exception:
raise self._final_exception
else:
@@ -2920,9 +2920,10 @@ class PagedResult(object):
response_future = None
def __init__(self, response_future, initial_response):
def __init__(self, response_future, initial_response, timeout=_NOT_SET):
self.response_future = response_future
self.current_response = iter(initial_response)
self.timeout = timeout
def __iter__(self):
return self
@@ -2935,7 +2936,7 @@ class PagedResult(object):
raise
self.response_future.start_fetching_next_page()
result = self.response_future.result()
result = self.response_future.result(self.timeout)
if self.response_future.has_more_pages:
self.current_response = result.current_response
else:

View File

@@ -31,7 +31,7 @@ def execute_concurrent(session, statements_and_parameters, concurrency=100, rais
``parameters`` item must be a sequence or :const:`None`.
A sequence of ``(success, result_or_exc)`` tuples is returned in the same
order that the statements were passed in. If ``success`` if :const:`False`,
order that the statements were passed in. If ``success`` is :const:`False`,
there was an error executing the statement, and ``result_or_exc`` will be
an :class:`Exception`. If ``success`` is :const:`True`, ``result_or_exc``
will be the query result.

View File

@@ -17,12 +17,12 @@ from tests.integration import PROTOCOL_VERSION
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
import unittest # noqa
from itertools import cycle
from cassandra import InvalidRequest, ConsistencyLevel
from cassandra.cluster import Cluster
from cassandra.cluster import Cluster, PagedResult
from cassandra.concurrent import (execute_concurrent,
execute_concurrent_with_args)
from cassandra.policies import HostDistance
@@ -83,6 +83,31 @@ class ClusterTests(unittest.TestCase):
self.assertEqual(num_statements, len(results))
self.assertEqual([(True, [(i,)]) for i in range(num_statements)], results)
def test_execute_concurrent_paged_result(self):
num_statements = 201
statement = SimpleStatement(
"INSERT INTO test3rf.test (k, v) VALUES (%s, %s)",
consistency_level=ConsistencyLevel.QUORUM)
parameters = [(i, i) for i in range(num_statements)]
results = execute_concurrent_with_args(self.session, statement, parameters)
self.assertEqual(num_statements, len(results))
self.assertEqual([(True, None)] * num_statements, results)
# read
statement = SimpleStatement(
"SELECT * FROM test3rf.test LIMIT %s",
consistency_level=ConsistencyLevel.QUORUM,
fetch_size=int(num_statements/2))
parameters = [(i, ) for i in range(num_statements)]
results = execute_concurrent_with_args(self.session, statement, [(num_statements,)])
self.assertEqual(1, len(results))
self.assertTrue(results[0][0])
result = results[0][1]
self.assertIsInstance(result, PagedResult)
self.assertEqual(num_statements, sum(1 for _ in result))
def test_first_failure(self):
statements = cycle(("INSERT INTO test3rf.test (k, v) VALUES (%s, %s)", ))
parameters = [(i, i) for i in range(100)]