From 97adeca31f40ec73546bde5e41a20b739f34f5bc Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Mon, 12 Oct 2015 15:02:30 -0500 Subject: [PATCH 01/10] surface query trace in ResultSet API PYTHON-318 --- cassandra/cluster.py | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index ef41b582..9dae0370 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -3218,7 +3218,8 @@ class ResponseFuture(object): if not self._query_trace: return None - self._query_trace.populate(max_wait) + if not self._query_trace.events: + self._query_trace.populate(max_wait) return self._query_trace def add_callback(self, fn, *args, **kwargs): @@ -3361,9 +3362,13 @@ class ResultSet(object): self._current_rows = initial_response self._page_iter = None self._list_mode = False + self._traces = [response_future._query_trace] if response_future._query_trace else [] @property def has_more_pages(self): + """ + True if the last response indicated more pages; False otherwise + """ return self.response_future.has_more_pages def __iter__(self): @@ -3383,6 +3388,8 @@ class ResultSet(object): self.response_future.start_fetching_next_page() result = self.response_future.result() + if self.response_future._query_trace: + self._traces.append(self.response_future._query_trace) self._current_rows = result._current_rows self._page_iter = iter(self._current_rows) @@ -3416,3 +3423,29 @@ class ResultSet(object): return bool(self._current_rows) __bool__ = __nonzero__ + + def get_query_trace(self, max_wait_sec=None): + """ + Fetches and returns the query trace of the last response, or `None` if tracing was + not enabled. + + Note that this may raise an exception if there are problems retrieving the trace + details from Cassandra. If the trace is not available after `max_wait_sec`, + :exc:`cassandra.query.TraceUnavailable` will be raised. + """ + if self._traces: + self._get_trace(0, max_wait_sec) + + def get_all_query_traces(self, max_wait_sec=None): + """ + Fetches and returns the query traces for all query pages, if tracing was enabled. + + See note in :meth:`~.get_current_query_trace` regarding possible exceptions. + """ + return [self._get_trace(i, max_wait_sec) for i in range(len(self._traces))] + + def _get_trace(self, i, max_wait): + trace = self._traces[i] + if not trace.events: + trace.populate(max_wait=max_wait) + return trace From 7f7b8cc280fed6332ad73a85a51d3594b8ce3433 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Mon, 12 Oct 2015 15:21:49 -0500 Subject: [PATCH 02/10] always return ResultSet, even for non-row responses --- cassandra/cluster.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 9dae0370..6749085f 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -2720,7 +2720,6 @@ class ResponseFuture(object): _start_time = None _metrics = None _paging_state = None - _is_result_kind_rows = False _custom_payload = None _warnings = None _timer = None @@ -2931,8 +2930,7 @@ class ResponseFuture(object): self, **response.results) else: results = getattr(response, 'results', None) - self._is_result_kind_rows = response.kind ==RESULT_KIND_ROWS - if results is not None and self._is_result_kind_rows: + if results is not None and response.kind ==RESULT_KIND_ROWS: self._paging_state = response.paging_state results = self.row_factory(*results) self._set_final_result(results) @@ -3199,10 +3197,7 @@ class ResponseFuture(object): if not self._event.is_set(): self._on_timeout() if self._final_result is not _NOT_SET: - if self._is_result_kind_rows: - return ResultSet(self, self._final_result) - else: - return self._final_result + return ResultSet(self, self._final_result) else: raise self._final_exception @@ -3359,7 +3354,7 @@ class ResultSet(object): def __init__(self, response_future, initial_response): self.response_future = response_future - self._current_rows = initial_response + self._current_rows = initial_response or [] self._page_iter = None self._list_mode = False self._traces = [response_future._query_trace] if response_future._query_trace else [] From 9ca9985f703a872cf6bada1ed4949bf70f04d9c0 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Tue, 13 Oct 2015 11:42:35 -0500 Subject: [PATCH 03/10] Trace info is now attached to ResultSet --- cassandra/cluster.py | 37 ++++--------------- cassandra/concurrent.py | 6 +-- tests/integration/standard/test_cluster.py | 27 +++++++------- tests/integration/standard/test_concurrent.py | 21 +++++++---- tests/integration/standard/test_query.py | 37 ++++++++++++------- .../standard/test_row_factories.py | 1 + tests/unit/test_concurrent.py | 10 ++--- tests/unit/test_response_future.py | 8 ++-- 8 files changed, 71 insertions(+), 76 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 6749085f..8ef7b661 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1601,34 +1601,14 @@ class Session(object): no timeout. Please see :meth:`.ResponseFuture.result` for details on the scope and effect of this timeout. - If `trace` is set to :const:`True`, an attempt will be made to - fetch the trace details and attach them to the `query`'s - :attr:`~.Statement.trace` attribute in the form of a :class:`.QueryTrace` - instance. This requires that `query` be a :class:`.Statement` subclass - instance and not just a string. If there is an error fetching the - trace details, the :attr:`~.Statement.trace` attribute will be left as - :const:`None`. + If `trace` is set to :const:`True`, the query will be sent with tracing enabled. + The trace details can be obtained using the returned :class:`.ResultSet` object. `custom_payload` is a :ref:`custom_payload` dict to be passed to the server. If `query` is a Statement with its own custom_payload. The message payload will be a union of the two, with the values specified here taking precedence. """ - if trace and not isinstance(query, Statement): - raise TypeError( - "The query argument must be an instance of a subclass of " - "cassandra.query.Statement when trace=True") - - future = self.execute_async(query, parameters, trace, custom_payload, timeout) - try: - result = future.result() - finally: - if trace: - try: - query.trace = future.get_query_trace(self.max_trace_wait) - except Exception: - log.exception("Unable to fetch query trace:") - - return result + return self.execute_async(query, parameters, trace, custom_payload, timeout).result() def execute_async(self, query, parameters=None, trace=False, custom_payload=None, timeout=_NOT_SET): """ @@ -1638,9 +1618,9 @@ class Session(object): on the :class:`.ResponseFuture` to syncronously block for results at any time. - If `trace` is set to :const:`True`, you may call - :meth:`.ResponseFuture.get_query_trace()` after the request - completes to retrieve a :class:`.QueryTrace` instance. + If `trace` is set to :const:`True`, you may get the query trace descriptors using + :meth:`.ResultSet.get_query_trace()` or :meth:`.ResultSet.get_all_query_traces()` + on the future result. `custom_payload` is a :ref:`custom_payload` dict to be passed to the server. If `query` is a Statement with its own custom_payload. The message payload @@ -1730,8 +1710,7 @@ class Session(object): query.batch_type, query._statements_and_parameters, cl, query.serial_consistency_level, timestamp) - if trace: - message.tracing = True + message.tracing = trace message.update_custom_payload(query.custom_payload) message.update_custom_payload(custom_payload) @@ -3429,7 +3408,7 @@ class ResultSet(object): :exc:`cassandra.query.TraceUnavailable` will be raised. """ if self._traces: - self._get_trace(0, max_wait_sec) + return self._get_trace(0, max_wait_sec) def get_all_query_traces(self, max_wait_sec=None): """ diff --git a/cassandra/concurrent.py b/cassandra/concurrent.py index 3391fdea..a360ffaa 100644 --- a/cassandra/concurrent.py +++ b/cassandra/concurrent.py @@ -134,10 +134,8 @@ class _ConcurrentExecutor(object): self._put_result(e, idx, False) def _on_success(self, result, future, idx): - if future._is_result_kind_rows: - result = ResultSet(future, result) - future.clear_callbacks() - self._put_result(result, idx, True) + future.clear_callbacks() + self._put_result(ResultSet(future, result), idx, True) def _on_error(self, result, future, idx): self._put_result(result, idx, False) diff --git a/tests/integration/standard/test_cluster.py b/tests/integration/standard/test_cluster.py index 5fe97886..97adb45f 100644 --- a/tests/integration/standard/test_cluster.py +++ b/tests/integration/standard/test_cluster.py @@ -78,7 +78,7 @@ class ClusterTests(unittest.TestCase): CREATE KEYSPACE clustertests WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} """) - self.assertEqual(None, result) + self.assertFalse(result) result = execute_until_pass(session, """ @@ -89,13 +89,13 @@ class ClusterTests(unittest.TestCase): PRIMARY KEY (a, b) ) """) - self.assertEqual(None, result) + self.assertFalse(result) result = session.execute( """ INSERT INTO clustertests.cf0 (a, b, c) VALUES ('a', 'b', 'c') """) - self.assertEqual(None, result) + self.assertFalse(result) result = session.execute("SELECT * FROM clustertests.cf0") self.assertEqual([('a', 'b', 'c')], result) @@ -152,7 +152,7 @@ class ClusterTests(unittest.TestCase): """ INSERT INTO test3rf.test (k, v) VALUES (8889, 8889) """) - self.assertEqual(None, result) + self.assertFalse(result) result = session.execute("SELECT * FROM test3rf.test") self.assertEqual([(8889, 8889)], result) @@ -437,8 +437,6 @@ class ClusterTests(unittest.TestCase): cluster = Cluster(protocol_version=PROTOCOL_VERSION) session = cluster.connect() - self.assertRaises(TypeError, session.execute, "SELECT * FROM system.local", trace=True) - def check_trace(trace): self.assertIsNot(None, trace.request_type) self.assertIsNot(None, trace.duration) @@ -446,15 +444,18 @@ class ClusterTests(unittest.TestCase): self.assertIsNot(None, trace.coordinator) self.assertIsNot(None, trace.events) - query = "SELECT * FROM system.local" - statement = SimpleStatement(query) - session.execute(statement, trace=True) - check_trace(statement.trace) + result = session.execute( "SELECT * FROM system.local", trace=True) + check_trace(result.get_query_trace()) query = "SELECT * FROM system.local" statement = SimpleStatement(query) - session.execute(statement) - self.assertEqual(None, statement.trace) + result = session.execute(statement, trace=True) + check_trace(result.get_query_trace()) + + query = "SELECT * FROM system.local" + statement = SimpleStatement(query) + result = session.execute(statement) + self.assertIsNone(result.get_query_trace()) statement2 = SimpleStatement(query) future = session.execute_async(statement2, trace=True) @@ -464,7 +465,7 @@ class ClusterTests(unittest.TestCase): statement2 = SimpleStatement(query) future = session.execute_async(statement2) future.result() - self.assertEqual(None, future.get_query_trace()) + self.assertIsNone(future.get_query_trace()) prepared = session.prepare("SELECT * FROM system.local") future = session.execute_async(prepared, parameters=(), trace=True) diff --git a/tests/integration/standard/test_concurrent.py b/tests/integration/standard/test_concurrent.py index 23b810c5..a0cf79fa 100644 --- a/tests/integration/standard/test_concurrent.py +++ b/tests/integration/standard/test_concurrent.py @@ -89,7 +89,9 @@ class ClusterTests(unittest.TestCase): results = self.execute_concurrent_helper(self.session, list(zip(statements, parameters))) self.assertEqual(num_statements, len(results)) - self.assertEqual([(True, None)] * num_statements, results) + for success, result in results: + self.assertTrue(success) + self.assertFalse(result) # read statement = SimpleStatement( @@ -111,7 +113,9 @@ class ClusterTests(unittest.TestCase): results = self.execute_concurrent_args_helper(self.session, statement, parameters) self.assertEqual(num_statements, len(results)) - self.assertEqual([(True, None)] * num_statements, results) + for success, result in results: + self.assertTrue(success) + self.assertFalse(result) # read statement = SimpleStatement( @@ -143,8 +147,9 @@ class ClusterTests(unittest.TestCase): parameters = [(i, i) for i in range(num_statements)] results = self.execute_concurrent_args_helper(self.session, statement, parameters, results_generator=True) - for result in results: - self.assertEqual((True, None), result) + for success, result in results: + self.assertTrue(success) + self.assertFalse(result) # read statement = SimpleStatement( @@ -172,7 +177,9 @@ class ClusterTests(unittest.TestCase): results = self.execute_concurrent_args_helper(self.session, statement, parameters) self.assertEqual(num_statements, len(results)) - self.assertEqual([(True, None)] * num_statements, results) + for success, result in results: + self.assertTrue(success) + self.assertFalse(result) # read statement = SimpleStatement( @@ -273,7 +280,7 @@ class ClusterTests(unittest.TestCase): self.assertIsInstance(result, InvalidRequest) else: self.assertTrue(success) - self.assertEqual(None, result) + self.assertFalse(result) def test_no_raise_on_first_failure_client_side(self): statement = SimpleStatement( @@ -292,4 +299,4 @@ class ClusterTests(unittest.TestCase): self.assertIsInstance(result, TypeError) else: self.assertTrue(success) - self.assertEqual(None, result) + self.assertFalse(result) diff --git a/tests/integration/standard/test_query.py b/tests/integration/standard/test_query.py index 2364a49f..fbb206b1 100644 --- a/tests/integration/standard/test_query.py +++ b/tests/integration/standard/test_query.py @@ -65,27 +65,34 @@ class QueryTests(unittest.TestCase): query = "SELECT * FROM system.local" statement = SimpleStatement(query) - session.execute(statement, trace=True) + rs = session.execute(statement, trace=True) # Ensure this does not throw an exception - str(statement.trace) - for event in statement.trace.events: + trace = rs.get_query_trace() + self.assertTrue(trace.events) + str(trace) + for event in trace.events: str(event) cluster.shutdown() - def test_trace_id_to_query(self): + def test_trace_id_to_resultset(self): cluster = Cluster(protocol_version=PROTOCOL_VERSION) session = cluster.connect() - query = "SELECT * FROM system.local" - statement = SimpleStatement(query) - self.assertIsNone(statement.trace_id) - future = session.execute_async(statement, trace=True) + future = session.execute_async("SELECT * FROM system.local", trace=True) - # query should have trace_id, even before trace is obtained - future.result() - self.assertIsNotNone(statement.trace_id) + # future should have the current trace + rs = future.result() + future_trace = future.get_query_trace() + self.assertIsNotNone(future_trace) + + rs_trace = rs.get_query_trace() + self.assertEqual(rs_trace, future_trace) + self.assertTrue(rs_trace.events) + self.assertEqual(len(rs_trace.events), len(future_trace.events)) + + self.assertListEqual([rs_trace], rs.get_all_query_traces()) cluster.shutdown() @@ -96,11 +103,13 @@ class QueryTests(unittest.TestCase): query = "SELECT * FROM system.local" statement = SimpleStatement(query) - session.execute(statement, trace=True) + rs = session.execute(statement, trace=True) # Ensure this does not throw an exception - str(statement.trace) - for event in statement.trace.events: + trace = rs.get_query_trace() + self.assertTrue(trace.events) + str(trace) + for event in trace.events: str(event) cluster.shutdown() diff --git a/tests/integration/standard/test_row_factories.py b/tests/integration/standard/test_row_factories.py index d4391daf..c43fe57e 100644 --- a/tests/integration/standard/test_row_factories.py +++ b/tests/integration/standard/test_row_factories.py @@ -94,6 +94,7 @@ class RowFactoryTests(unittest.TestCase): result = session.execute(self.select) self.assertIsInstance(result, ResultSet) + result = list(result) for row in result: self.assertEqual(row.k, row.v) diff --git a/tests/unit/test_concurrent.py b/tests/unit/test_concurrent.py index 7465f685..3c2734e4 100644 --- a/tests/unit/test_concurrent.py +++ b/tests/unit/test_concurrent.py @@ -32,7 +32,7 @@ class MockResponseResponseFuture(): and invoke callback with various timing. """ - _is_result_kind_rows = False + _query_trace = None # a list pending callbacks, these will be prioritized in reverse or normal orderd pending_callbacks = PriorityQueue() @@ -106,7 +106,7 @@ class TimedCallableInvoker(threading.Thread): self._stopper.wait(.1) callback_args = pending_callback[1] fn, args, kwargs, time_added = callback_args - fn(time_added, *args, **kwargs) + fn([time_added], *args, **kwargs) self._stopper.wait(.001) return @@ -222,8 +222,8 @@ class ConcurrencyTest((unittest.TestCase)): :param results: """ last_time_added = 0 - for result in results: - current_time_added = result[1] + for success, result in results: + self.assertTrue(success) + current_time_added = list(result)[0] self.assertLess(last_time_added, current_time_added) last_time_added = current_time_added - diff --git a/tests/unit/test_response_future.py b/tests/unit/test_response_future.py index 3c41c53c..5c59259b 100644 --- a/tests/unit/test_response_future.py +++ b/tests/unit/test_response_future.py @@ -94,7 +94,7 @@ class ResponseFutureTests(unittest.TestCase): results="keyspace1") rf._set_result(result) rf._set_keyspace_completed({}) - self.assertEqual(None, rf.result()) + self.assertFalse(rf.result()) def test_schema_change_result(self): session = self.make_session() @@ -113,9 +113,9 @@ class ResponseFutureTests(unittest.TestCase): session = self.make_session() rf = self.make_response_future(session) rf.send_request() - result = object() + result = [1, 2, 3] rf._set_result(Mock(spec=ResultMessage, kind=999, results=result)) - self.assertIs(result, rf.result()) + self.assertListEqual(list(rf.result()), result) def test_read_timeout_error_message(self): session = self.make_session() @@ -172,7 +172,7 @@ class ResponseFutureTests(unittest.TestCase): result = Mock(spec=UnavailableErrorMessage, info={}) rf._set_result(result) - self.assertEqual(None, rf.result()) + self.assertFalse(rf.result()) def test_retry_policy_says_retry(self): session = self.make_session() From ead4f2fc8a478bc7bd85f2fcb82699fb7aaff593 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Tue, 13 Oct 2015 11:43:05 -0500 Subject: [PATCH 04/10] ignore generated test artifact --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index a874ebce..5c9cbec9 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,7 @@ setuptools*.egg cassandra/*.c !cassandra/cmurmur3.c cassandra/*.html +tests/unit/cython/bytesio_testhelper.c # OSX .DS_Store From 932a1d5272d075f945ea5659a2a642e3e3cdcb58 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Tue, 13 Oct 2015 11:43:23 -0500 Subject: [PATCH 05/10] Remove trace, trace_id from Statement --- cassandra/cluster.py | 2 -- cassandra/query.py | 12 ------------ 2 files changed, 14 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 8ef7b661..208d80de 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -2880,8 +2880,6 @@ class ResponseFuture(object): trace_id = getattr(response, 'trace_id', None) if trace_id: - if self.query: - self.query.trace_id = trace_id self._query_trace = QueryTrace(trace_id, self.session) self._warnings = getattr(response, 'warnings', None) diff --git a/cassandra/query.py b/cassandra/query.py index 0ba6c594..f1e0fd85 100644 --- a/cassandra/query.py +++ b/cassandra/query.py @@ -175,18 +175,6 @@ class Statement(object): will be retried. """ - trace = None - """ - If :meth:`.Session.execute()` is run with `trace` set to :const:`True`, - this will be set to a :class:`.QueryTrace` instance. - """ - - trace_id = None - """ - If :meth:`.Session.execute()` is run with `trace` set to :const:`True`, - this will be set to the tracing ID from the server. - """ - consistency_level = None """ The :class:`.ConsistencyLevel` to be used for this operation. Defaults From 183c8edb16f7509d956e63b13d4c321e104f0438 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Tue, 13 Oct 2015 17:03:23 -0500 Subject: [PATCH 06/10] query trace tracing all in ResponseFuture also add manual page fetching for ResultSet --- cassandra/cluster.py | 83 ++++++++++++++++++++------------------------ 1 file changed, 38 insertions(+), 45 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 208d80de..c2363533 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1619,7 +1619,7 @@ class Session(object): any time. If `trace` is set to :const:`True`, you may get the query trace descriptors using - :meth:`.ResultSet.get_query_trace()` or :meth:`.ResultSet.get_all_query_traces()` + :meth:`.ResponseFuture.get_query_trace()` or :meth:`.ResponseFuture.get_all_query_traces()` on the future result. `custom_payload` is a :ref:`custom_payload` dict to be passed to the server. @@ -2689,7 +2689,7 @@ class ResponseFuture(object): _req_id = None _final_result = _NOT_SET _final_exception = None - _query_trace = None + _query_traces = None _callbacks = None _errbacks = None _current_host = None @@ -2880,7 +2880,9 @@ class ResponseFuture(object): trace_id = getattr(response, 'trace_id', None) if trace_id: - self._query_trace = QueryTrace(trace_id, self.session) + if not self._query_traces: + self._query_traces = [] + self._query_traces.append(QueryTrace(trace_id, self.session)) self._warnings = getattr(response, 'warnings', None) self._custom_payload = getattr(response, 'custom_payload', None) @@ -3180,19 +3182,31 @@ class ResponseFuture(object): def get_query_trace(self, max_wait=None): """ - Returns the :class:`~.query.QueryTrace` instance representing a trace - of the last attempt for this operation, or :const:`None` if tracing was - not enabled for this query. Note that this may raise an exception if - there are problems retrieving the trace details from Cassandra. If the - trace is not available after `max_wait` seconds, + Fetches and returns the query trace of the last response, or `None` if tracing was + not enabled. + + Note that this may raise an exception if there are problems retrieving the trace + details from Cassandra. If the trace is not available after `max_wait_sec`, :exc:`cassandra.query.TraceUnavailable` will be raised. """ - if not self._query_trace: - return None + if self._query_traces: + return self._get_query_trace(len(self._query_traces) - 1, max_wait) - if not self._query_trace.events: - self._query_trace.populate(max_wait) - return self._query_trace + def get_all_query_traces(self, max_wait_per=None): + """ + Fetches and returns the query traces for all query pages, if tracing was enabled. + + See note in :meth:`~.get_current_query_trace` regarding possible exceptions. + """ + if self._query_traces: + return [self._get_query_trace(i, max_wait_per) for i in range(len(self._query_traces))] + return [] + + def _get_query_trace(self, i, max_wait): + trace = self._query_traces[i] + if not trace.events: + trace.populate(max_wait=max_wait) + return trace def add_callback(self, fn, *args, **kwargs): """ @@ -3334,7 +3348,6 @@ class ResultSet(object): self._current_rows = initial_response or [] self._page_iter = None self._list_mode = False - self._traces = [response_future._query_trace] if response_future._query_trace else [] @property def has_more_pages(self): @@ -3343,6 +3356,10 @@ class ResultSet(object): """ return self.response_future.has_more_pages + @property + def current_rows(self): + return self._current_rows or [] + def __iter__(self): if self._list_mode: return iter(self._current_rows) @@ -3358,17 +3375,19 @@ class ResultSet(object): self._current_rows = [] raise - self.response_future.start_fetching_next_page() - result = self.response_future.result() - if self.response_future._query_trace: - self._traces.append(self.response_future._query_trace) - self._current_rows = result._current_rows + self.fetch_next_page() self._page_iter = iter(self._current_rows) return next(self._page_iter) __next__ = next + def fetch_next_page(self): + if self.response_future.has_more_pages: + self.response_future.start_fetching_next_page() + result = self.response_future.result() + self._current_rows = result._current_rows + def _fetch_all(self): self._current_rows = list(self) self._page_iter = None @@ -3395,29 +3414,3 @@ class ResultSet(object): return bool(self._current_rows) __bool__ = __nonzero__ - - def get_query_trace(self, max_wait_sec=None): - """ - Fetches and returns the query trace of the last response, or `None` if tracing was - not enabled. - - Note that this may raise an exception if there are problems retrieving the trace - details from Cassandra. If the trace is not available after `max_wait_sec`, - :exc:`cassandra.query.TraceUnavailable` will be raised. - """ - if self._traces: - return self._get_trace(0, max_wait_sec) - - def get_all_query_traces(self, max_wait_sec=None): - """ - Fetches and returns the query traces for all query pages, if tracing was enabled. - - See note in :meth:`~.get_current_query_trace` regarding possible exceptions. - """ - return [self._get_trace(i, max_wait_sec) for i in range(len(self._traces))] - - def _get_trace(self, i, max_wait): - trace = self._traces[i] - if not trace.events: - trace.populate(max_wait=max_wait) - return trace From 88f0afe9b4c1774f3caf9173ef40412da10fcd10 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Wed, 14 Oct 2015 10:15:33 -0500 Subject: [PATCH 07/10] Add ResultSet get_*query*trace* pass-through to future --- cassandra/cluster.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index c2363533..83307208 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -3196,7 +3196,7 @@ class ResponseFuture(object): """ Fetches and returns the query traces for all query pages, if tracing was enabled. - See note in :meth:`~.get_current_query_trace` regarding possible exceptions. + See note in :meth:`~.get_query_trace` regarding possible exceptions. """ if self._query_traces: return [self._get_query_trace(i, max_wait_per) for i in range(len(self._query_traces))] @@ -3387,6 +3387,8 @@ class ResultSet(object): self.response_future.start_fetching_next_page() result = self.response_future.result() self._current_rows = result._current_rows + else: + self._current_rows = [] def _fetch_all(self): self._current_rows = list(self) @@ -3414,3 +3416,17 @@ class ResultSet(object): return bool(self._current_rows) __bool__ = __nonzero__ + + def get_query_trace(self, max_wait_sec=None): + """ + Gets the last query trace from the associated future. + See :meth:`.ResponseFuture.get_query_trace` for details. + """ + return self.response_future.get_query_trace(max_wait_sec) + + def get_all_query_traces(self, max_wait_sec_per=None): + """ + Gets all query traces from the associated future. + See :meth:`.ResponseFuture.get_all_query_traces` for details. + """ + return self.response_future.get_all_query_traces(max_wait_sec_per) From 5957a65ec12167cd83eda10a06ff87c24f3db7ae Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Wed, 14 Oct 2015 10:18:05 -0500 Subject: [PATCH 08/10] ResponseFuture doc updates --- docs/api/cassandra/cluster.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/api/cassandra/cluster.rst b/docs/api/cassandra/cluster.rst index 3ea0d230..8075476b 100644 --- a/docs/api/cassandra/cluster.rst +++ b/docs/api/cassandra/cluster.rst @@ -126,6 +126,8 @@ .. automethod:: get_query_trace() + .. automethod:: get_all_query_traces() + .. autoattribute:: custom_payload() .. autoattribute:: has_more_pages From 385baa1659df81d95b81a6582a10c455f25a181e Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Wed, 14 Oct 2015 10:18:23 -0500 Subject: [PATCH 09/10] Trace test updates for PYTHON-318 --- tests/integration/standard/test_client_warnings.py | 4 ++-- tests/integration/standard/test_cluster.py | 10 +++++----- tests/unit/test_resultset.py | 11 +++++++---- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/tests/integration/standard/test_client_warnings.py b/tests/integration/standard/test_client_warnings.py index 4316c939..90f22482 100644 --- a/tests/integration/standard/test_client_warnings.py +++ b/tests/integration/standard/test_client_warnings.py @@ -91,7 +91,7 @@ class ClientWarningTests(unittest.TestCase): future.result() self.assertEqual(len(future.warnings), 1) self.assertRegexpMatches(future.warnings[0], 'Batch.*exceeding.*') - self.assertIsNotNone(future._query_trace) + self.assertIsNotNone(future.get_query_trace()) def test_warning_with_custom_payload(self): """ @@ -127,5 +127,5 @@ class ClientWarningTests(unittest.TestCase): future.result() self.assertEqual(len(future.warnings), 1) self.assertRegexpMatches(future.warnings[0], 'Batch.*exceeding.*') - self.assertIsNotNone(future._query_trace) + self.assertIsNotNone(future.get_query_trace()) self.assertDictEqual(future.custom_payload, payload) diff --git a/tests/integration/standard/test_cluster.py b/tests/integration/standard/test_cluster.py index 97adb45f..d6d1394f 100644 --- a/tests/integration/standard/test_cluster.py +++ b/tests/integration/standard/test_cluster.py @@ -438,11 +438,11 @@ class ClusterTests(unittest.TestCase): session = cluster.connect() def check_trace(trace): - self.assertIsNot(None, trace.request_type) - self.assertIsNot(None, trace.duration) - self.assertIsNot(None, trace.started_at) - self.assertIsNot(None, trace.coordinator) - self.assertIsNot(None, trace.events) + self.assertIsNotNone(trace.request_type) + self.assertIsNotNone(trace.duration) + self.assertIsNotNone(trace.started_at) + self.assertIsNotNone(trace.coordinator) + self.assertIsNotNone(trace.events) result = session.execute( "SELECT * FROM system.local", trace=True) check_trace(result.get_query_trace()) diff --git a/tests/unit/test_resultset.py b/tests/unit/test_resultset.py index bb85de47..2a683767 100644 --- a/tests/unit/test_resultset.py +++ b/tests/unit/test_resultset.py @@ -37,7 +37,8 @@ class ResultSetTests(unittest.TestCase): response_future.result.side_effect = (ResultSet(Mock(), expected[-5:]), ) # ResultSet is iterable, so it must be protected in order to be returned whole by the Mock rs = ResultSet(response_future, expected[:5]) itr = iter(rs) - type(response_future).has_more_pages = PropertyMock(side_effect=(True, False)) # after init to avoid side effects being consumed by init + # this is brittle, depends on internal impl details. Would like to find a better way + type(response_future).has_more_pages = PropertyMock(side_effect=(True, True, False)) # after init to avoid side effects being consumed by init self.assertListEqual(list(itr), expected) def test_list_non_paged(self): @@ -54,7 +55,8 @@ class ResultSetTests(unittest.TestCase): response_future = Mock(has_more_pages=True) response_future.result.side_effect = (ResultSet(Mock(), expected[-5:]), ) # ResultSet is iterable, so it must be protected in order to be returned whole by the Mock rs = ResultSet(response_future, expected[:5]) - type(response_future).has_more_pages = PropertyMock(side_effect=(True, True, False)) # one True for getitem check/warn, then True, False for two pages + # this is brittle, depends on internal impl details. Would like to find a better way + type(response_future).has_more_pages = PropertyMock(side_effect=(True, True, True, False)) # First two True are consumed on check entering list mode self.assertEqual(rs[9], expected[9]) self.assertEqual(list(rs), expected) @@ -119,7 +121,8 @@ class ResultSetTests(unittest.TestCase): response_future = Mock(has_more_pages=True) response_future.result.side_effect = (ResultSet(Mock(), expected[-5:]), ) # ResultSet is iterable, so it must be protected in order to be returned whole by the Mock rs = ResultSet(response_future, expected[:5]) - type(response_future).has_more_pages = PropertyMock(side_effect=(True, True, False)) # First True is consumed on check entering list mode + # this is brittle, depends on internal impl details. Would like to find a better way + type(response_future).has_more_pages = PropertyMock(side_effect=(True, True, True, False)) # First two True are consumed on check entering list mode # index access before iteration causes list to be materialized self.assertEqual(rs[0], expected[0]) self.assertEqual(rs[9], expected[9]) @@ -146,7 +149,7 @@ class ResultSetTests(unittest.TestCase): response_future = Mock(has_more_pages=True) response_future.result.side_effect = (ResultSet(Mock(), expected[-5:]), ) # ResultSet is iterable, so it must be protected in order to be returned whole by the Mock rs = ResultSet(response_future, expected[:5]) - type(response_future).has_more_pages = PropertyMock(side_effect=(True, True, False)) + type(response_future).has_more_pages = PropertyMock(side_effect=(True, True, True, False)) # eq before iteration causes list to be materialized self.assertEqual(rs, expected) From 8ecb8d483a41d4ae8fc6470cce410f934861db3e Mon Sep 17 00:00:00 2001 From: Michael Penick Date: Tue, 20 Oct 2015 11:50:59 -0700 Subject: [PATCH 10/10] Added space back in condition --- cassandra/cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 83307208..a5839d9c 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -2909,7 +2909,7 @@ class ResponseFuture(object): self, **response.results) else: results = getattr(response, 'results', None) - if results is not None and response.kind ==RESULT_KIND_ROWS: + if results is not None and response.kind == RESULT_KIND_ROWS: self._paging_state = response.paging_state results = self.row_factory(*results) self._set_final_result(results)