Exponential trace fetch backoff, now configurable, test
This commit is contained in:
@@ -911,6 +911,17 @@ class Session(object):
|
||||
timeout, neither the registered callback or errback will be called.
|
||||
"""
|
||||
|
||||
max_trace_wait = 2.0
|
||||
"""
|
||||
The maximum amount of time (in seconds) the driver will wait for trace
|
||||
details to be populated server-side for a query before giving up.
|
||||
If the `trace` parameter for :meth:`~.execute()` or :meth:`~.execute_async()`
|
||||
is :const:`True`, the driver will repeatedly attempt to fetch trace
|
||||
details for the query (using exponential backoff) until this limit is
|
||||
hit. If the limit is passed, :exc:`cassandra.query.TraceUnavailable`
|
||||
will be raised.
|
||||
"""
|
||||
|
||||
_lock = None
|
||||
_pools = None
|
||||
_load_balancer = None
|
||||
@@ -977,7 +988,7 @@ class Session(object):
|
||||
finally:
|
||||
if trace:
|
||||
try:
|
||||
query.trace = future.get_query_trace()
|
||||
query.trace = future.get_query_trace(self.max_trace_wait)
|
||||
except Exception:
|
||||
log.exception("Unable to fetch query trace:")
|
||||
|
||||
@@ -2162,17 +2173,19 @@ class ResponseFuture(object):
|
||||
else:
|
||||
raise OperationTimedOut()
|
||||
|
||||
def get_query_trace(self):
|
||||
def get_query_trace(self, max_wait=None):
|
||||
"""
|
||||
Returns the :class:`~.query.QueryTrace` instance representing a trace
|
||||
of the last attempt for this operation, or :const:`None` if tracing was
|
||||
not enabled for this query. Note that this may raise an exception if
|
||||
there are problems retrieving the trace details from Cassandra.
|
||||
there are problems retrieving the trace details from Cassandra. If the
|
||||
trace is not available after `max_wait` seconds,
|
||||
:exc:`cassandra.query.TraceUnavailable` will be raised.
|
||||
"""
|
||||
if not self._query_trace:
|
||||
return None
|
||||
|
||||
self._query_trace.populate()
|
||||
self._query_trace.populate(max_wait)
|
||||
return self._query_trace
|
||||
|
||||
def add_callback(self, fn, *args, **kwargs):
|
||||
|
||||
@@ -13,6 +13,9 @@ from cassandra.cqltypes import unix_time_from_uuid1
|
||||
from cassandra.decoder import (cql_encoders, cql_encode_object,
|
||||
cql_encode_sequence)
|
||||
|
||||
import logging
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Statement(object):
|
||||
"""
|
||||
@@ -386,27 +389,29 @@ class QueryTrace(object):
|
||||
_SELECT_SESSIONS_FORMAT = "SELECT * FROM system_traces.sessions WHERE session_id = %s"
|
||||
_SELECT_EVENTS_FORMAT = "SELECT * FROM system_traces.events WHERE session_id = %s"
|
||||
_BASE_RETRY_SLEEP = 0.003
|
||||
_MAX_ATTEMPTS = 5
|
||||
|
||||
def __init__(self, trace_id, session):
|
||||
self.trace_id = trace_id
|
||||
self._session = session
|
||||
|
||||
def populate(self):
|
||||
def populate(self, max_wait=2.0):
|
||||
"""
|
||||
Retrieves the actual tracing details from Cassandra and populates the
|
||||
attributes of this instance. Because tracing details are stored
|
||||
asynchronously by Cassandra, this may need to retry the session
|
||||
detail fetch up to five times before raising :exc:`.TraceUnavailable`.
|
||||
|
||||
Currently intended for internal use only.
|
||||
detail fetch. If the trace is still not available after `max_wait`
|
||||
seconds, :exc:`.TraceUnavailable` will be raised; if `max_wait` is
|
||||
:const:`None`, this will retry forever.
|
||||
"""
|
||||
attempt = 0
|
||||
while attempt <= self._MAX_ATTEMPTS:
|
||||
attempt += 1
|
||||
start = time.time()
|
||||
while True:
|
||||
if max_wait is not None and time.time() - start >= max_wait:
|
||||
raise TraceUnavailable("Trace information was not available within %f seconds" % (max_wait,))
|
||||
session_results = self._session.execute(self._SELECT_SESSIONS_FORMAT, (self.trace_id,))
|
||||
if not session_results or session_results[0].duration is None:
|
||||
time.sleep(self._BASE_RETRY_SLEEP * attempt)
|
||||
time.sleep(self._BASE_RETRY_SLEEP * (2 ** attempt))
|
||||
attempt += 1
|
||||
continue
|
||||
|
||||
session_row = session_results[0]
|
||||
@@ -419,6 +424,7 @@ class QueryTrace(object):
|
||||
event_results = self._session.execute(self._SELECT_EVENTS_FORMAT, (self.trace_id,))
|
||||
self.events = tuple(TraceEvent(r.activity, r.event_id, r.source, r.source_elapsed, r.thread)
|
||||
for r in event_results)
|
||||
break
|
||||
|
||||
def __str__(self):
|
||||
return "%s [%s] coordinator: %s, started at: %s, duration: %s, parameters: %s" \
|
||||
|
||||
@@ -4,7 +4,7 @@ except ImportError:
|
||||
import unittest # noqa
|
||||
|
||||
import cassandra
|
||||
from cassandra.query import SimpleStatement
|
||||
from cassandra.query import SimpleStatement, TraceUnavailable
|
||||
from cassandra.io.asyncorereactor import AsyncoreConnection
|
||||
from cassandra.policies import RoundRobinPolicy, ExponentialReconnectionPolicy, RetryPolicy, SimpleConvictionPolicy, HostDistance
|
||||
|
||||
@@ -214,6 +214,16 @@ class ClusterTests(unittest.TestCase):
|
||||
future.result()
|
||||
self.assertEqual(None, future.get_query_trace())
|
||||
|
||||
def test_trace_timeout(self):
|
||||
cluster = Cluster()
|
||||
session = cluster.connect()
|
||||
|
||||
query = "SELECT * FROM system.local"
|
||||
statement = SimpleStatement(query)
|
||||
future = session.execute_async(statement, trace=True)
|
||||
future.result()
|
||||
self.assertRaises(TraceUnavailable, future.get_query_trace, -1.0)
|
||||
|
||||
def test_string_coverage(self):
|
||||
"""
|
||||
Ensure str(future) returns without error
|
||||
|
||||
@@ -8,6 +8,7 @@ from cassandra.cluster import Cluster
|
||||
|
||||
|
||||
class QueryTest(unittest.TestCase):
|
||||
|
||||
def test_query(self):
|
||||
cluster = Cluster()
|
||||
session = cluster.connect()
|
||||
|
||||
Reference in New Issue
Block a user