From 5300d231c9887d07e7cf4aa7865f70a1e067ea9e Mon Sep 17 00:00:00 2001 From: Samuel Charron Date: Thu, 14 Aug 2014 17:40:32 +0200 Subject: [PATCH 1/2] Keep the timeout for paged results --- cassandra/cluster.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index fa5defd1..b4b1e35b 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -2730,7 +2730,7 @@ class ResponseFuture(object): if self._paging_state is None: return self._final_result else: - return PagedResult(self, self._final_result) + return PagedResult(self, self._final_result, timeout) elif self._final_exception: raise self._final_exception else: @@ -2739,7 +2739,7 @@ class ResponseFuture(object): if self._paging_state is None: return self._final_result else: - return PagedResult(self, self._final_result) + return PagedResult(self, self._final_result, timeout) elif self._final_exception: raise self._final_exception else: @@ -2893,9 +2893,10 @@ class PagedResult(object): response_future = None - def __init__(self, response_future, initial_response): + def __init__(self, response_future, initial_response, timeout): self.response_future = response_future self.current_response = iter(initial_response) + self.timeout = timeout def __iter__(self): return self @@ -2908,7 +2909,7 @@ class PagedResult(object): raise self.response_future.start_fetching_next_page() - result = self.response_future.result() + result = self.response_future.result(self.timeout) if self.response_future.has_more_pages: self.current_response = result.current_response else: From b4318bf580114050ebef88699474e36b640c7bdf Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Thu, 14 Aug 2014 15:02:40 -0500 Subject: [PATCH 2/2] Tweaks to #182. Set default timeout, add test. Add default timeout for PagedResult class Add integration test for concurrent with PagedResult Minor comment correction --- cassandra/cluster.py | 2 +- cassandra/concurrent.py | 2 +- tests/integration/standard/test_concurrent.py | 29 +++++++++++++++++-- 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index b4b1e35b..1465f8de 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -2893,7 +2893,7 @@ class PagedResult(object): response_future = None - def __init__(self, response_future, initial_response, timeout): + def __init__(self, response_future, initial_response, timeout=_NOT_SET): self.response_future = response_future self.current_response = iter(initial_response) self.timeout = timeout diff --git a/cassandra/concurrent.py b/cassandra/concurrent.py index 0f646da8..4ddef1ba 100644 --- a/cassandra/concurrent.py +++ b/cassandra/concurrent.py @@ -31,7 +31,7 @@ def execute_concurrent(session, statements_and_parameters, concurrency=100, rais ``parameters`` item must be a sequence or :const:`None`. A sequence of ``(success, result_or_exc)`` tuples is returned in the same - order that the statements were passed in. If ``success`` if :const:`False`, + order that the statements were passed in. If ``success`` is :const:`False`, there was an error executing the statement, and ``result_or_exc`` will be an :class:`Exception`. If ``success`` is :const:`True`, ``result_or_exc`` will be the query result. diff --git a/tests/integration/standard/test_concurrent.py b/tests/integration/standard/test_concurrent.py index 35804db1..d8b7aac9 100644 --- a/tests/integration/standard/test_concurrent.py +++ b/tests/integration/standard/test_concurrent.py @@ -17,12 +17,12 @@ from tests.integration import PROTOCOL_VERSION try: import unittest2 as unittest except ImportError: - import unittest # noqa + import unittest # noqa from itertools import cycle from cassandra import InvalidRequest, ConsistencyLevel -from cassandra.cluster import Cluster +from cassandra.cluster import Cluster, PagedResult from cassandra.concurrent import (execute_concurrent, execute_concurrent_with_args) from cassandra.policies import HostDistance @@ -83,6 +83,31 @@ class ClusterTests(unittest.TestCase): self.assertEqual(num_statements, len(results)) self.assertEqual([(True, [(i,)]) for i in range(num_statements)], results) + def test_execute_concurrent_paged_result(self): + num_statements = 201 + statement = SimpleStatement( + "INSERT INTO test3rf.test (k, v) VALUES (%s, %s)", + consistency_level=ConsistencyLevel.QUORUM) + parameters = [(i, i) for i in range(num_statements)] + + results = execute_concurrent_with_args(self.session, statement, parameters) + self.assertEqual(num_statements, len(results)) + self.assertEqual([(True, None)] * num_statements, results) + + # read + statement = SimpleStatement( + "SELECT * FROM test3rf.test LIMIT %s", + consistency_level=ConsistencyLevel.QUORUM, + fetch_size=int(num_statements/2)) + parameters = [(i, ) for i in range(num_statements)] + + results = execute_concurrent_with_args(self.session, statement, [(num_statements,)]) + self.assertEqual(1, len(results)) + self.assertTrue(results[0][0]) + result = results[0][1] + self.assertIsInstance(result, PagedResult) + self.assertEqual(num_statements, sum(1 for _ in result)) + def test_first_failure(self): statements = cycle(("INSERT INTO test3rf.test (k, v) VALUES (%s, %s)", )) parameters = [(i, i) for i in range(100)]