RetryPolicy tests, related fixes
This commit is contained in:
@@ -83,15 +83,15 @@ class ResponseFuture(object):
|
||||
if isinstance(response, ReadTimeoutErrorMessage):
|
||||
details = response.recv_error_info()
|
||||
retry = retry_policy.on_read_timeout(
|
||||
self.query, attempt_num=self._query_retries, **details)
|
||||
self.query, retry_num=self._query_retries, **details)
|
||||
elif isinstance(response, WriteTimeoutErrorMessage):
|
||||
details = response.recv_error_info()
|
||||
retry = retry_policy.on_write_timeout(
|
||||
self.query, attempt_num=self._query_retries, **details)
|
||||
self.query, retry_num=self._query_retries, **details)
|
||||
elif isinstance(response, UnavailableExceptionErrorMessage):
|
||||
details = response.recv_error_info()
|
||||
retry = retry_policy.on_write_timeout(
|
||||
self.query, attempt_num=self._query_retries, **details)
|
||||
self.query, retry_num=self._query_retries, **details)
|
||||
elif isinstance(response, OverloadedErrorMessage):
|
||||
# need to retry against a different host here
|
||||
self._retry(False, None)
|
||||
|
||||
@@ -177,8 +177,8 @@ class RetryPolicy(object):
|
||||
IGNORE = 2
|
||||
|
||||
def on_read_timeout(self, query, consistency, required_responses,
|
||||
received_responses, data_retrieved, attempt_num):
|
||||
if attempt_num != 0:
|
||||
received_responses, data_retrieved, retry_num):
|
||||
if retry_num != 0:
|
||||
return (self.RETHROW, None)
|
||||
elif received_responses >= required_responses and not data_retrieved:
|
||||
return (self.RETRY, consistency)
|
||||
@@ -186,15 +186,15 @@ class RetryPolicy(object):
|
||||
return (self.RETHROW, None)
|
||||
|
||||
def on_write_timeout(self, query, consistency, write_type,
|
||||
required_responses, received_responses, attempt_num):
|
||||
if attempt_num != 0:
|
||||
required_responses, received_responses, retry_num):
|
||||
if retry_num != 0:
|
||||
return (self.RETHROW, None)
|
||||
elif write_type == WriteType.BATCH_LOG:
|
||||
return (self.RETRY, consistency)
|
||||
else:
|
||||
return (self.RETHROW, None)
|
||||
|
||||
def on_unavailable(self, query, consistency, required_replicas, alive_replicas, attempt_num):
|
||||
def on_unavailable(self, query, consistency, required_replicas, alive_replicas, retry_num):
|
||||
return (self.RETHROW, None)
|
||||
|
||||
|
||||
@@ -210,21 +210,21 @@ class FallthroughRetryPolicy(RetryPolicy):
|
||||
return (self.RETHROW, None)
|
||||
|
||||
|
||||
class FallthroughRetryPolicy(RetryPolicy):
|
||||
class DowngradingConsistencyRetryPolicy(RetryPolicy):
|
||||
|
||||
def _pick_consistency(self, num_responses):
|
||||
if num_responses >= 3:
|
||||
return (self.RETRY, ConsistencyLevel.name_to_value["THREE"])
|
||||
return (self.RETRY, ConsistencyLevel.THREE)
|
||||
elif num_responses >= 2:
|
||||
return (self.RETRY, ConsistencyLevel.name_to_value["TWO"])
|
||||
return (self.RETRY, ConsistencyLevel.TWO)
|
||||
elif num_responses >= 1:
|
||||
return (self.RETRY, ConsistencyLevel.name_to_value["ONE"])
|
||||
return (self.RETRY, ConsistencyLevel.ONE)
|
||||
else:
|
||||
return (self.RETHROW, None)
|
||||
|
||||
def on_read_timeout(self, query, consistency, required_responses,
|
||||
received_responses, data_retrieved, attempt_num):
|
||||
if attempt_num != 0:
|
||||
received_responses, data_retrieved, retry_num):
|
||||
if retry_num != 0:
|
||||
return (self.RETHROW, None)
|
||||
elif received_responses < required_responses:
|
||||
return self._pick_consistency(received_responses)
|
||||
@@ -234,11 +234,11 @@ class FallthroughRetryPolicy(RetryPolicy):
|
||||
return (self.RETHROW, None)
|
||||
|
||||
def on_write_timeout(self, query, consistency, write_type,
|
||||
required_responses, received_responses, attempt_num):
|
||||
if attempt_num != 0:
|
||||
required_responses, received_responses, retry_num):
|
||||
if retry_num != 0:
|
||||
return (self.RETHROW, None)
|
||||
elif write_type in (WriteType.SIMPLE, WriteType.BATCH, WriteType.COUNTER):
|
||||
return (self.IGNORED, None)
|
||||
return (self.IGNORE, None)
|
||||
elif write_type == WriteType.UNLOGGED_BATCH:
|
||||
return self._pick_consistency(received_responses)
|
||||
elif write_type == WriteType.BATCH_LOG:
|
||||
@@ -246,8 +246,8 @@ class FallthroughRetryPolicy(RetryPolicy):
|
||||
else:
|
||||
return (self.RETHROW, None)
|
||||
|
||||
def on_unavailable(self, query, consistency, required_replicas, alive_replicas, attempt_num):
|
||||
if attempt_num != 0:
|
||||
def on_unavailable(self, query, consistency, required_replicas, alive_replicas, retry_num):
|
||||
if retry_num != 0:
|
||||
return (self.RETHROW, None)
|
||||
else:
|
||||
return self._pick_consistency(alive_replicas)
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
import unittest
|
||||
from threading import Thread
|
||||
|
||||
from cassandra.decoder import ConsistencyLevel
|
||||
from cassandra.policies import (RoundRobinPolicy, DCAwareRoundRobinPolicy,
|
||||
SimpleConvictionPolicy, HostDistance,
|
||||
ExponentialReconnectionPolicy)
|
||||
ExponentialReconnectionPolicy, RetryPolicy,
|
||||
WriteType, DowngradingConsistencyRetryPolicy)
|
||||
from cassandra.pool import Host
|
||||
|
||||
class TestRoundRobinPolicy(unittest.TestCase):
|
||||
@@ -158,3 +160,144 @@ class ExponentialReconnectionPolicyTest(unittest.TestCase):
|
||||
self.assertEqual(delay, schedule[i - 1] * 2)
|
||||
else:
|
||||
self.assertEqual(delay, 100)
|
||||
|
||||
|
||||
class RetryPolicyTest(unittest.TestCase):
|
||||
|
||||
def test_read_timeout(self):
|
||||
policy = RetryPolicy()
|
||||
|
||||
# if this is the second or greater attempt, rethrow
|
||||
retry, consistency = policy.on_read_timeout(
|
||||
query=None, consistency="ONE", required_responses=1, received_responses=2,
|
||||
data_retrieved=True, retry_num=1)
|
||||
self.assertEqual(retry, RetryPolicy.RETHROW)
|
||||
|
||||
# if we didn't get enough responses, rethrow
|
||||
retry, consistency = policy.on_read_timeout(
|
||||
query=None, consistency="ONE", required_responses=2, received_responses=1,
|
||||
data_retrieved=True, retry_num=0)
|
||||
self.assertEqual(retry, RetryPolicy.RETHROW)
|
||||
|
||||
# if we got enough responses, but also got a data response, rethrow
|
||||
retry, consistency = policy.on_read_timeout(
|
||||
query=None, consistency="ONE", required_responses=2, received_responses=2,
|
||||
data_retrieved=True, retry_num=0)
|
||||
self.assertEqual(retry, RetryPolicy.RETHROW)
|
||||
|
||||
# we got enough reponses but no data response, so retry
|
||||
retry, consistency = policy.on_read_timeout(
|
||||
query=None, consistency="ONE", required_responses=2, received_responses=2,
|
||||
data_retrieved=False, retry_num=0)
|
||||
self.assertEqual(retry, RetryPolicy.RETRY)
|
||||
self.assertEqual(consistency, "ONE")
|
||||
|
||||
def test_write_timeout(self):
|
||||
policy = RetryPolicy()
|
||||
|
||||
# if this is the second or greater attempt, rethrow
|
||||
retry, consistency = policy.on_write_timeout(
|
||||
query=None, consistency="ONE", write_type=WriteType.SIMPLE,
|
||||
required_responses=1, received_responses=2, retry_num=1)
|
||||
self.assertEqual(retry, RetryPolicy.RETHROW)
|
||||
|
||||
# if it's not a BATCH_LOG write, don't retry it
|
||||
retry, consistency = policy.on_write_timeout(
|
||||
query=None, consistency="ONE", write_type=WriteType.SIMPLE,
|
||||
required_responses=1, received_responses=2, retry_num=0)
|
||||
self.assertEqual(retry, RetryPolicy.RETHROW)
|
||||
|
||||
# retry BATCH_LOG writes regardless of received responses
|
||||
retry, consistency = policy.on_write_timeout(
|
||||
query=None, consistency="ONE", write_type=WriteType.BATCH_LOG,
|
||||
required_responses=10000, received_responses=1, retry_num=0)
|
||||
self.assertEqual(retry, RetryPolicy.RETRY)
|
||||
self.assertEqual(consistency, "ONE")
|
||||
|
||||
|
||||
class DowngradingConsistencyRetryPolicyTest(unittest.TestCase):
|
||||
|
||||
def test_read_timeout(self):
|
||||
policy = DowngradingConsistencyRetryPolicy()
|
||||
|
||||
# if this is the second or greater attempt, rethrow
|
||||
retry, consistency = policy.on_read_timeout(
|
||||
query=None, consistency="ONE", required_responses=1, received_responses=2,
|
||||
data_retrieved=True, retry_num=1)
|
||||
self.assertEqual(retry, RetryPolicy.RETHROW)
|
||||
|
||||
# if we didn't get enough responses, retry at a lower consistency
|
||||
retry, consistency = policy.on_read_timeout(
|
||||
query=None, consistency="ONE", required_responses=3, received_responses=2,
|
||||
data_retrieved=True, retry_num=0)
|
||||
self.assertEqual(retry, RetryPolicy.RETRY)
|
||||
self.assertEqual(consistency, ConsistencyLevel.TWO)
|
||||
|
||||
# retry consistency level goes down based on the # of recv'd responses
|
||||
retry, consistency = policy.on_read_timeout(
|
||||
query=None, consistency="ONE", required_responses=3, received_responses=1,
|
||||
data_retrieved=True, retry_num=0)
|
||||
self.assertEqual(retry, RetryPolicy.RETRY)
|
||||
self.assertEqual(consistency, ConsistencyLevel.ONE)
|
||||
|
||||
# if we got no responses, rethrow
|
||||
retry, consistency = policy.on_read_timeout(
|
||||
query=None, consistency="ONE", required_responses=3, received_responses=0,
|
||||
data_retrieved=True, retry_num=0)
|
||||
self.assertEqual(retry, RetryPolicy.RETHROW)
|
||||
|
||||
# if we got enough response but no data, retry
|
||||
retry, consistency = policy.on_read_timeout(
|
||||
query=None, consistency="ONE", required_responses=3, received_responses=3,
|
||||
data_retrieved=False, retry_num=0)
|
||||
self.assertEqual(retry, RetryPolicy.RETRY)
|
||||
|
||||
# if we got enough responses, but also got a data response, rethrow
|
||||
retry, consistency = policy.on_read_timeout(
|
||||
query=None, consistency="ONE", required_responses=2, received_responses=2,
|
||||
data_retrieved=True, retry_num=0)
|
||||
self.assertEqual(retry, RetryPolicy.RETHROW)
|
||||
|
||||
def test_write_timeout(self):
|
||||
policy = DowngradingConsistencyRetryPolicy()
|
||||
|
||||
# if this is the second or greater attempt, rethrow
|
||||
retry, consistency = policy.on_write_timeout(
|
||||
query=None, consistency="ONE", write_type=WriteType.SIMPLE,
|
||||
required_responses=1, received_responses=2, retry_num=1)
|
||||
self.assertEqual(retry, RetryPolicy.RETHROW)
|
||||
|
||||
# ignore failures on these types of writes
|
||||
for write_type in (WriteType.SIMPLE, WriteType.BATCH, WriteType.COUNTER):
|
||||
retry, consistency = policy.on_write_timeout(
|
||||
query=None, consistency="ONE", write_type=write_type,
|
||||
required_responses=1, received_responses=2, retry_num=0)
|
||||
self.assertEqual(retry, RetryPolicy.IGNORE)
|
||||
|
||||
# downgrade consistency level on unlogged batch writes
|
||||
retry, consistency = policy.on_write_timeout(
|
||||
query=None, consistency="ONE", write_type=WriteType.UNLOGGED_BATCH,
|
||||
required_responses=3, received_responses=1, retry_num=0)
|
||||
self.assertEqual(retry, RetryPolicy.RETRY)
|
||||
self.assertEqual(consistency, ConsistencyLevel.ONE)
|
||||
|
||||
# retry batch log writes at the same consistency level
|
||||
retry, consistency = policy.on_write_timeout(
|
||||
query=None, consistency="ONE", write_type=WriteType.BATCH_LOG,
|
||||
required_responses=3, received_responses=1, retry_num=0)
|
||||
self.assertEqual(retry, RetryPolicy.RETRY)
|
||||
self.assertEqual(consistency, "ONE")
|
||||
|
||||
def test_unavailable(self):
|
||||
policy = DowngradingConsistencyRetryPolicy()
|
||||
|
||||
# if this is the second or greater attempt, rethrow
|
||||
retry, consistency = policy.on_unavailable(
|
||||
query=None, consistency="ONE", required_replicas=3, alive_replicas=1, retry_num=1)
|
||||
self.assertEqual(retry, RetryPolicy.RETHROW)
|
||||
|
||||
# downgrade consistency on unavailable exceptions
|
||||
retry, consistency = policy.on_unavailable(
|
||||
query=None, consistency="ONE", required_replicas=3, alive_replicas=1, retry_num=0)
|
||||
self.assertEqual(retry, RetryPolicy.RETRY)
|
||||
self.assertEqual(consistency, ConsistencyLevel.ONE)
|
||||
|
||||
Reference in New Issue
Block a user