Merge pull request #419 from datastax/318

PYTHON-318 - return trace data on ResultSet
This commit is contained in:
Michael Penick
2015-10-20 11:51:24 -07:00
13 changed files with 144 additions and 120 deletions

1
.gitignore vendored
View File

@@ -21,6 +21,7 @@ setuptools*.egg
cassandra/*.c
!cassandra/cmurmur3.c
cassandra/*.html
tests/unit/cython/bytesio_testhelper.c
# OSX
.DS_Store

View File

@@ -1605,34 +1605,14 @@ class Session(object):
no timeout. Please see :meth:`.ResponseFuture.result` for details on
the scope and effect of this timeout.
If `trace` is set to :const:`True`, an attempt will be made to
fetch the trace details and attach them to the `query`'s
:attr:`~.Statement.trace` attribute in the form of a :class:`.QueryTrace`
instance. This requires that `query` be a :class:`.Statement` subclass
instance and not just a string. If there is an error fetching the
trace details, the :attr:`~.Statement.trace` attribute will be left as
:const:`None`.
If `trace` is set to :const:`True`, the query will be sent with tracing enabled.
The trace details can be obtained using the returned :class:`.ResultSet` object.
`custom_payload` is a :ref:`custom_payload` dict to be passed to the server.
If `query` is a Statement with its own custom_payload. The message payload
will be a union of the two, with the values specified here taking precedence.
"""
if trace and not isinstance(query, Statement):
raise TypeError(
"The query argument must be an instance of a subclass of "
"cassandra.query.Statement when trace=True")
future = self.execute_async(query, parameters, trace, custom_payload, timeout)
try:
result = future.result()
finally:
if trace:
try:
query.trace = future.get_query_trace(self.max_trace_wait)
except Exception:
log.exception("Unable to fetch query trace:")
return result
return self.execute_async(query, parameters, trace, custom_payload, timeout).result()
def execute_async(self, query, parameters=None, trace=False, custom_payload=None, timeout=_NOT_SET):
"""
@@ -1642,9 +1622,9 @@ class Session(object):
on the :class:`.ResponseFuture` to syncronously block for results at
any time.
If `trace` is set to :const:`True`, you may call
:meth:`.ResponseFuture.get_query_trace()` after the request
completes to retrieve a :class:`.QueryTrace` instance.
If `trace` is set to :const:`True`, you may get the query trace descriptors using
:meth:`.ResponseFuture.get_query_trace()` or :meth:`.ResponseFuture.get_all_query_traces()`
on the future result.
`custom_payload` is a :ref:`custom_payload` dict to be passed to the server.
If `query` is a Statement with its own custom_payload. The message payload
@@ -1734,8 +1714,7 @@ class Session(object):
query.batch_type, query._statements_and_parameters, cl,
query.serial_consistency_level, timestamp)
if trace:
message.tracing = True
message.tracing = trace
message.update_custom_payload(query.custom_payload)
message.update_custom_payload(custom_payload)
@@ -2714,7 +2693,7 @@ class ResponseFuture(object):
_req_id = None
_final_result = _NOT_SET
_final_exception = None
_query_trace = None
_query_traces = None
_callbacks = None
_errbacks = None
_current_host = None
@@ -2724,7 +2703,6 @@ class ResponseFuture(object):
_start_time = None
_metrics = None
_paging_state = None
_is_result_kind_rows = False
_custom_payload = None
_warnings = None
_timer = None
@@ -2906,9 +2884,9 @@ class ResponseFuture(object):
trace_id = getattr(response, 'trace_id', None)
if trace_id:
if self.query:
self.query.trace_id = trace_id
self._query_trace = QueryTrace(trace_id, self.session)
if not self._query_traces:
self._query_traces = []
self._query_traces.append(QueryTrace(trace_id, self.session))
self._warnings = getattr(response, 'warnings', None)
self._custom_payload = getattr(response, 'custom_payload', None)
@@ -2935,8 +2913,7 @@ class ResponseFuture(object):
self, **response.results)
else:
results = getattr(response, 'results', None)
self._is_result_kind_rows = response.kind ==RESULT_KIND_ROWS
if results is not None and self._is_result_kind_rows:
if results is not None and response.kind == RESULT_KIND_ROWS:
self._paging_state = response.paging_state
results = self.row_factory(*results)
self._set_final_result(results)
@@ -3203,27 +3180,37 @@ class ResponseFuture(object):
if not self._event.is_set():
self._on_timeout()
if self._final_result is not _NOT_SET:
if self._is_result_kind_rows:
return ResultSet(self, self._final_result)
else:
return self._final_result
return ResultSet(self, self._final_result)
else:
raise self._final_exception
def get_query_trace(self, max_wait=None):
"""
Returns the :class:`~.query.QueryTrace` instance representing a trace
of the last attempt for this operation, or :const:`None` if tracing was
not enabled for this query. Note that this may raise an exception if
there are problems retrieving the trace details from Cassandra. If the
trace is not available after `max_wait` seconds,
Fetches and returns the query trace of the last response, or `None` if tracing was
not enabled.
Note that this may raise an exception if there are problems retrieving the trace
details from Cassandra. If the trace is not available after `max_wait_sec`,
:exc:`cassandra.query.TraceUnavailable` will be raised.
"""
if not self._query_trace:
return None
if self._query_traces:
return self._get_query_trace(len(self._query_traces) - 1, max_wait)
self._query_trace.populate(max_wait)
return self._query_trace
def get_all_query_traces(self, max_wait_per=None):
"""
Fetches and returns the query traces for all query pages, if tracing was enabled.
See note in :meth:`~.get_query_trace` regarding possible exceptions.
"""
if self._query_traces:
return [self._get_query_trace(i, max_wait_per) for i in range(len(self._query_traces))]
return []
def _get_query_trace(self, i, max_wait):
trace = self._query_traces[i]
if not trace.events:
trace.populate(max_wait=max_wait)
return trace
def add_callback(self, fn, *args, **kwargs):
"""
@@ -3362,14 +3349,21 @@ class ResultSet(object):
def __init__(self, response_future, initial_response):
self.response_future = response_future
self._current_rows = initial_response
self._current_rows = initial_response or []
self._page_iter = None
self._list_mode = False
@property
def has_more_pages(self):
"""
True if the last response indicated more pages; False otherwise
"""
return self.response_future.has_more_pages
@property
def current_rows(self):
return self._current_rows or []
def __iter__(self):
if self._list_mode:
return iter(self._current_rows)
@@ -3385,15 +3379,21 @@ class ResultSet(object):
self._current_rows = []
raise
self.response_future.start_fetching_next_page()
result = self.response_future.result()
self._current_rows = result._current_rows
self.fetch_next_page()
self._page_iter = iter(self._current_rows)
return next(self._page_iter)
__next__ = next
def fetch_next_page(self):
if self.response_future.has_more_pages:
self.response_future.start_fetching_next_page()
result = self.response_future.result()
self._current_rows = result._current_rows
else:
self._current_rows = []
def _fetch_all(self):
self._current_rows = list(self)
self._page_iter = None
@@ -3420,3 +3420,17 @@ class ResultSet(object):
return bool(self._current_rows)
__bool__ = __nonzero__
def get_query_trace(self, max_wait_sec=None):
"""
Gets the last query trace from the associated future.
See :meth:`.ResponseFuture.get_query_trace` for details.
"""
return self.response_future.get_query_trace(max_wait_sec)
def get_all_query_traces(self, max_wait_sec_per=None):
"""
Gets all query traces from the associated future.
See :meth:`.ResponseFuture.get_all_query_traces` for details.
"""
return self.response_future.get_all_query_traces(max_wait_sec_per)

View File

@@ -134,10 +134,8 @@ class _ConcurrentExecutor(object):
self._put_result(e, idx, False)
def _on_success(self, result, future, idx):
if future._is_result_kind_rows:
result = ResultSet(future, result)
future.clear_callbacks()
self._put_result(result, idx, True)
future.clear_callbacks()
self._put_result(ResultSet(future, result), idx, True)
def _on_error(self, result, future, idx):
self._put_result(result, idx, False)

View File

@@ -175,18 +175,6 @@ class Statement(object):
will be retried.
"""
trace = None
"""
If :meth:`.Session.execute()` is run with `trace` set to :const:`True`,
this will be set to a :class:`.QueryTrace` instance.
"""
trace_id = None
"""
If :meth:`.Session.execute()` is run with `trace` set to :const:`True`,
this will be set to the tracing ID from the server.
"""
consistency_level = None
"""
The :class:`.ConsistencyLevel` to be used for this operation. Defaults

View File

@@ -129,6 +129,8 @@
.. automethod:: get_query_trace()
.. automethod:: get_all_query_traces()
.. autoattribute:: custom_payload()
.. autoattribute:: has_more_pages

View File

@@ -91,7 +91,7 @@ class ClientWarningTests(unittest.TestCase):
future.result()
self.assertEqual(len(future.warnings), 1)
self.assertRegexpMatches(future.warnings[0], 'Batch.*exceeding.*')
self.assertIsNotNone(future._query_trace)
self.assertIsNotNone(future.get_query_trace())
def test_warning_with_custom_payload(self):
"""
@@ -127,5 +127,5 @@ class ClientWarningTests(unittest.TestCase):
future.result()
self.assertEqual(len(future.warnings), 1)
self.assertRegexpMatches(future.warnings[0], 'Batch.*exceeding.*')
self.assertIsNotNone(future._query_trace)
self.assertIsNotNone(future.get_query_trace())
self.assertDictEqual(future.custom_payload, payload)

View File

@@ -78,7 +78,7 @@ class ClusterTests(unittest.TestCase):
CREATE KEYSPACE clustertests
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}
""")
self.assertEqual(None, result)
self.assertFalse(result)
result = execute_until_pass(session,
"""
@@ -89,13 +89,13 @@ class ClusterTests(unittest.TestCase):
PRIMARY KEY (a, b)
)
""")
self.assertEqual(None, result)
self.assertFalse(result)
result = session.execute(
"""
INSERT INTO clustertests.cf0 (a, b, c) VALUES ('a', 'b', 'c')
""")
self.assertEqual(None, result)
self.assertFalse(result)
result = session.execute("SELECT * FROM clustertests.cf0")
self.assertEqual([('a', 'b', 'c')], result)
@@ -152,7 +152,7 @@ class ClusterTests(unittest.TestCase):
"""
INSERT INTO test3rf.test (k, v) VALUES (8889, 8889)
""")
self.assertEqual(None, result)
self.assertFalse(result)
result = session.execute("SELECT * FROM test3rf.test")
self.assertEqual([(8889, 8889)], result)
@@ -437,24 +437,25 @@ class ClusterTests(unittest.TestCase):
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
session = cluster.connect()
self.assertRaises(TypeError, session.execute, "SELECT * FROM system.local", trace=True)
def check_trace(trace):
self.assertIsNot(None, trace.request_type)
self.assertIsNot(None, trace.duration)
self.assertIsNot(None, trace.started_at)
self.assertIsNot(None, trace.coordinator)
self.assertIsNot(None, trace.events)
self.assertIsNotNone(trace.request_type)
self.assertIsNotNone(trace.duration)
self.assertIsNotNone(trace.started_at)
self.assertIsNotNone(trace.coordinator)
self.assertIsNotNone(trace.events)
result = session.execute( "SELECT * FROM system.local", trace=True)
check_trace(result.get_query_trace())
query = "SELECT * FROM system.local"
statement = SimpleStatement(query)
session.execute(statement, trace=True)
check_trace(statement.trace)
result = session.execute(statement, trace=True)
check_trace(result.get_query_trace())
query = "SELECT * FROM system.local"
statement = SimpleStatement(query)
session.execute(statement)
self.assertEqual(None, statement.trace)
result = session.execute(statement)
self.assertIsNone(result.get_query_trace())
statement2 = SimpleStatement(query)
future = session.execute_async(statement2, trace=True)
@@ -464,7 +465,7 @@ class ClusterTests(unittest.TestCase):
statement2 = SimpleStatement(query)
future = session.execute_async(statement2)
future.result()
self.assertEqual(None, future.get_query_trace())
self.assertIsNone(future.get_query_trace())
prepared = session.prepare("SELECT * FROM system.local")
future = session.execute_async(prepared, parameters=(), trace=True)

View File

@@ -89,7 +89,9 @@ class ClusterTests(unittest.TestCase):
results = self.execute_concurrent_helper(self.session, list(zip(statements, parameters)))
self.assertEqual(num_statements, len(results))
self.assertEqual([(True, None)] * num_statements, results)
for success, result in results:
self.assertTrue(success)
self.assertFalse(result)
# read
statement = SimpleStatement(
@@ -111,7 +113,9 @@ class ClusterTests(unittest.TestCase):
results = self.execute_concurrent_args_helper(self.session, statement, parameters)
self.assertEqual(num_statements, len(results))
self.assertEqual([(True, None)] * num_statements, results)
for success, result in results:
self.assertTrue(success)
self.assertFalse(result)
# read
statement = SimpleStatement(
@@ -143,8 +147,9 @@ class ClusterTests(unittest.TestCase):
parameters = [(i, i) for i in range(num_statements)]
results = self.execute_concurrent_args_helper(self.session, statement, parameters, results_generator=True)
for result in results:
self.assertEqual((True, None), result)
for success, result in results:
self.assertTrue(success)
self.assertFalse(result)
# read
statement = SimpleStatement(
@@ -172,7 +177,9 @@ class ClusterTests(unittest.TestCase):
results = self.execute_concurrent_args_helper(self.session, statement, parameters)
self.assertEqual(num_statements, len(results))
self.assertEqual([(True, None)] * num_statements, results)
for success, result in results:
self.assertTrue(success)
self.assertFalse(result)
# read
statement = SimpleStatement(
@@ -273,7 +280,7 @@ class ClusterTests(unittest.TestCase):
self.assertIsInstance(result, InvalidRequest)
else:
self.assertTrue(success)
self.assertEqual(None, result)
self.assertFalse(result)
def test_no_raise_on_first_failure_client_side(self):
statement = SimpleStatement(
@@ -292,4 +299,4 @@ class ClusterTests(unittest.TestCase):
self.assertIsInstance(result, TypeError)
else:
self.assertTrue(success)
self.assertEqual(None, result)
self.assertFalse(result)

View File

@@ -67,27 +67,34 @@ class QueryTests(unittest.TestCase):
query = "SELECT * FROM system.local"
statement = SimpleStatement(query)
session.execute(statement, trace=True)
rs = session.execute(statement, trace=True)
# Ensure this does not throw an exception
str(statement.trace)
for event in statement.trace.events:
trace = rs.get_query_trace()
self.assertTrue(trace.events)
str(trace)
for event in trace.events:
str(event)
cluster.shutdown()
def test_trace_id_to_query(self):
def test_trace_id_to_resultset(self):
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
session = cluster.connect()
query = "SELECT * FROM system.local"
statement = SimpleStatement(query)
self.assertIsNone(statement.trace_id)
future = session.execute_async(statement, trace=True)
future = session.execute_async("SELECT * FROM system.local", trace=True)
# query should have trace_id, even before trace is obtained
future.result()
self.assertIsNotNone(statement.trace_id)
# future should have the current trace
rs = future.result()
future_trace = future.get_query_trace()
self.assertIsNotNone(future_trace)
rs_trace = rs.get_query_trace()
self.assertEqual(rs_trace, future_trace)
self.assertTrue(rs_trace.events)
self.assertEqual(len(rs_trace.events), len(future_trace.events))
self.assertListEqual([rs_trace], rs.get_all_query_traces())
cluster.shutdown()
@@ -98,11 +105,13 @@ class QueryTests(unittest.TestCase):
query = "SELECT * FROM system.local"
statement = SimpleStatement(query)
session.execute(statement, trace=True)
rs = session.execute(statement, trace=True)
# Ensure this does not throw an exception
str(statement.trace)
for event in statement.trace.events:
trace = rs.get_query_trace()
self.assertTrue(trace.events)
str(trace)
for event in trace.events:
str(event)
cluster.shutdown()

View File

@@ -85,6 +85,7 @@ class RowFactoryTests(BasicSharedKeyspaceUnitTestCaseWFunctionTable):
result = session.execute(self.select)
self.assertIsInstance(result, ResultSet)
result = list(result)
for row in result:
self.assertEqual(row.k, row.v)

View File

@@ -32,7 +32,7 @@ class MockResponseResponseFuture():
and invoke callback with various timing.
"""
_is_result_kind_rows = False
_query_trace = None
# a list pending callbacks, these will be prioritized in reverse or normal orderd
pending_callbacks = PriorityQueue()
@@ -106,7 +106,7 @@ class TimedCallableInvoker(threading.Thread):
self._stopper.wait(.1)
callback_args = pending_callback[1]
fn, args, kwargs, time_added = callback_args
fn(time_added, *args, **kwargs)
fn([time_added], *args, **kwargs)
self._stopper.wait(.001)
return
@@ -222,8 +222,8 @@ class ConcurrencyTest((unittest.TestCase)):
:param results:
"""
last_time_added = 0
for result in results:
current_time_added = result[1]
for success, result in results:
self.assertTrue(success)
current_time_added = list(result)[0]
self.assertLess(last_time_added, current_time_added)
last_time_added = current_time_added

View File

@@ -94,7 +94,7 @@ class ResponseFutureTests(unittest.TestCase):
results="keyspace1")
rf._set_result(result)
rf._set_keyspace_completed({})
self.assertEqual(None, rf.result())
self.assertFalse(rf.result())
def test_schema_change_result(self):
session = self.make_session()
@@ -113,9 +113,9 @@ class ResponseFutureTests(unittest.TestCase):
session = self.make_session()
rf = self.make_response_future(session)
rf.send_request()
result = object()
result = [1, 2, 3]
rf._set_result(Mock(spec=ResultMessage, kind=999, results=result))
self.assertIs(result, rf.result())
self.assertListEqual(list(rf.result()), result)
def test_read_timeout_error_message(self):
session = self.make_session()
@@ -172,7 +172,7 @@ class ResponseFutureTests(unittest.TestCase):
result = Mock(spec=UnavailableErrorMessage, info={})
rf._set_result(result)
self.assertEqual(None, rf.result())
self.assertFalse(rf.result())
def test_retry_policy_says_retry(self):
session = self.make_session()

View File

@@ -37,7 +37,8 @@ class ResultSetTests(unittest.TestCase):
response_future.result.side_effect = (ResultSet(Mock(), expected[-5:]), ) # ResultSet is iterable, so it must be protected in order to be returned whole by the Mock
rs = ResultSet(response_future, expected[:5])
itr = iter(rs)
type(response_future).has_more_pages = PropertyMock(side_effect=(True, False)) # after init to avoid side effects being consumed by init
# this is brittle, depends on internal impl details. Would like to find a better way
type(response_future).has_more_pages = PropertyMock(side_effect=(True, True, False)) # after init to avoid side effects being consumed by init
self.assertListEqual(list(itr), expected)
def test_list_non_paged(self):
@@ -54,7 +55,8 @@ class ResultSetTests(unittest.TestCase):
response_future = Mock(has_more_pages=True)
response_future.result.side_effect = (ResultSet(Mock(), expected[-5:]), ) # ResultSet is iterable, so it must be protected in order to be returned whole by the Mock
rs = ResultSet(response_future, expected[:5])
type(response_future).has_more_pages = PropertyMock(side_effect=(True, True, False)) # one True for getitem check/warn, then True, False for two pages
# this is brittle, depends on internal impl details. Would like to find a better way
type(response_future).has_more_pages = PropertyMock(side_effect=(True, True, True, False)) # First two True are consumed on check entering list mode
self.assertEqual(rs[9], expected[9])
self.assertEqual(list(rs), expected)
@@ -119,7 +121,8 @@ class ResultSetTests(unittest.TestCase):
response_future = Mock(has_more_pages=True)
response_future.result.side_effect = (ResultSet(Mock(), expected[-5:]), ) # ResultSet is iterable, so it must be protected in order to be returned whole by the Mock
rs = ResultSet(response_future, expected[:5])
type(response_future).has_more_pages = PropertyMock(side_effect=(True, True, False)) # First True is consumed on check entering list mode
# this is brittle, depends on internal impl details. Would like to find a better way
type(response_future).has_more_pages = PropertyMock(side_effect=(True, True, True, False)) # First two True are consumed on check entering list mode
# index access before iteration causes list to be materialized
self.assertEqual(rs[0], expected[0])
self.assertEqual(rs[9], expected[9])
@@ -146,7 +149,7 @@ class ResultSetTests(unittest.TestCase):
response_future = Mock(has_more_pages=True)
response_future.result.side_effect = (ResultSet(Mock(), expected[-5:]), ) # ResultSet is iterable, so it must be protected in order to be returned whole by the Mock
rs = ResultSet(response_future, expected[:5])
type(response_future).has_more_pages = PropertyMock(side_effect=(True, True, False))
type(response_future).has_more_pages = PropertyMock(side_effect=(True, True, True, False))
# eq before iteration causes list to be materialized
self.assertEqual(rs, expected)