From 1af6b887f6e2d4a4c147eb3d13b554529e23b5e4 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Wed, 10 Apr 2013 15:11:47 -0500 Subject: [PATCH] RetryPolicy tests, related fixes --- cassandra/cluster.py | 6 +- cassandra/policies.py | 32 ++++----- tests/test_policies.py | 145 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 163 insertions(+), 20 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 57be5135..d0f6dbf1 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -83,15 +83,15 @@ class ResponseFuture(object): if isinstance(response, ReadTimeoutErrorMessage): details = response.recv_error_info() retry = retry_policy.on_read_timeout( - self.query, attempt_num=self._query_retries, **details) + self.query, retry_num=self._query_retries, **details) elif isinstance(response, WriteTimeoutErrorMessage): details = response.recv_error_info() retry = retry_policy.on_write_timeout( - self.query, attempt_num=self._query_retries, **details) + self.query, retry_num=self._query_retries, **details) elif isinstance(response, UnavailableExceptionErrorMessage): details = response.recv_error_info() retry = retry_policy.on_write_timeout( - self.query, attempt_num=self._query_retries, **details) + self.query, retry_num=self._query_retries, **details) elif isinstance(response, OverloadedErrorMessage): # need to retry against a different host here self._retry(False, None) diff --git a/cassandra/policies.py b/cassandra/policies.py index e990dd87..e39976f5 100644 --- a/cassandra/policies.py +++ b/cassandra/policies.py @@ -177,8 +177,8 @@ class RetryPolicy(object): IGNORE = 2 def on_read_timeout(self, query, consistency, required_responses, - received_responses, data_retrieved, attempt_num): - if attempt_num != 0: + received_responses, data_retrieved, retry_num): + if retry_num != 0: return (self.RETHROW, None) elif received_responses >= required_responses and not data_retrieved: return (self.RETRY, consistency) @@ -186,15 +186,15 @@ class RetryPolicy(object): return (self.RETHROW, None) def on_write_timeout(self, query, consistency, write_type, - required_responses, received_responses, attempt_num): - if attempt_num != 0: + required_responses, received_responses, retry_num): + if retry_num != 0: return (self.RETHROW, None) elif write_type == WriteType.BATCH_LOG: return (self.RETRY, consistency) else: return (self.RETHROW, None) - def on_unavailable(self, query, consistency, required_replicas, alive_replicas, attempt_num): + def on_unavailable(self, query, consistency, required_replicas, alive_replicas, retry_num): return (self.RETHROW, None) @@ -210,21 +210,21 @@ class FallthroughRetryPolicy(RetryPolicy): return (self.RETHROW, None) -class FallthroughRetryPolicy(RetryPolicy): +class DowngradingConsistencyRetryPolicy(RetryPolicy): def _pick_consistency(self, num_responses): if num_responses >= 3: - return (self.RETRY, ConsistencyLevel.name_to_value["THREE"]) + return (self.RETRY, ConsistencyLevel.THREE) elif num_responses >= 2: - return (self.RETRY, ConsistencyLevel.name_to_value["TWO"]) + return (self.RETRY, ConsistencyLevel.TWO) elif num_responses >= 1: - return (self.RETRY, ConsistencyLevel.name_to_value["ONE"]) + return (self.RETRY, ConsistencyLevel.ONE) else: return (self.RETHROW, None) def on_read_timeout(self, query, consistency, required_responses, - received_responses, data_retrieved, attempt_num): - if attempt_num != 0: + received_responses, data_retrieved, retry_num): + if retry_num != 0: return (self.RETHROW, None) elif received_responses < required_responses: return self._pick_consistency(received_responses) @@ -234,11 +234,11 @@ class FallthroughRetryPolicy(RetryPolicy): return (self.RETHROW, None) def on_write_timeout(self, query, consistency, write_type, - required_responses, received_responses, attempt_num): - if attempt_num != 0: + required_responses, received_responses, retry_num): + if retry_num != 0: return (self.RETHROW, None) elif write_type in (WriteType.SIMPLE, WriteType.BATCH, WriteType.COUNTER): - return (self.IGNORED, None) + return (self.IGNORE, None) elif write_type == WriteType.UNLOGGED_BATCH: return self._pick_consistency(received_responses) elif write_type == WriteType.BATCH_LOG: @@ -246,8 +246,8 @@ class FallthroughRetryPolicy(RetryPolicy): else: return (self.RETHROW, None) - def on_unavailable(self, query, consistency, required_replicas, alive_replicas, attempt_num): - if attempt_num != 0: + def on_unavailable(self, query, consistency, required_replicas, alive_replicas, retry_num): + if retry_num != 0: return (self.RETHROW, None) else: return self._pick_consistency(alive_replicas) diff --git a/tests/test_policies.py b/tests/test_policies.py index 88ce7b8f..e146f6f1 100644 --- a/tests/test_policies.py +++ b/tests/test_policies.py @@ -1,9 +1,11 @@ import unittest from threading import Thread +from cassandra.decoder import ConsistencyLevel from cassandra.policies import (RoundRobinPolicy, DCAwareRoundRobinPolicy, SimpleConvictionPolicy, HostDistance, - ExponentialReconnectionPolicy) + ExponentialReconnectionPolicy, RetryPolicy, + WriteType, DowngradingConsistencyRetryPolicy) from cassandra.pool import Host class TestRoundRobinPolicy(unittest.TestCase): @@ -158,3 +160,144 @@ class ExponentialReconnectionPolicyTest(unittest.TestCase): self.assertEqual(delay, schedule[i - 1] * 2) else: self.assertEqual(delay, 100) + + +class RetryPolicyTest(unittest.TestCase): + + def test_read_timeout(self): + policy = RetryPolicy() + + # if this is the second or greater attempt, rethrow + retry, consistency = policy.on_read_timeout( + query=None, consistency="ONE", required_responses=1, received_responses=2, + data_retrieved=True, retry_num=1) + self.assertEqual(retry, RetryPolicy.RETHROW) + + # if we didn't get enough responses, rethrow + retry, consistency = policy.on_read_timeout( + query=None, consistency="ONE", required_responses=2, received_responses=1, + data_retrieved=True, retry_num=0) + self.assertEqual(retry, RetryPolicy.RETHROW) + + # if we got enough responses, but also got a data response, rethrow + retry, consistency = policy.on_read_timeout( + query=None, consistency="ONE", required_responses=2, received_responses=2, + data_retrieved=True, retry_num=0) + self.assertEqual(retry, RetryPolicy.RETHROW) + + # we got enough reponses but no data response, so retry + retry, consistency = policy.on_read_timeout( + query=None, consistency="ONE", required_responses=2, received_responses=2, + data_retrieved=False, retry_num=0) + self.assertEqual(retry, RetryPolicy.RETRY) + self.assertEqual(consistency, "ONE") + + def test_write_timeout(self): + policy = RetryPolicy() + + # if this is the second or greater attempt, rethrow + retry, consistency = policy.on_write_timeout( + query=None, consistency="ONE", write_type=WriteType.SIMPLE, + required_responses=1, received_responses=2, retry_num=1) + self.assertEqual(retry, RetryPolicy.RETHROW) + + # if it's not a BATCH_LOG write, don't retry it + retry, consistency = policy.on_write_timeout( + query=None, consistency="ONE", write_type=WriteType.SIMPLE, + required_responses=1, received_responses=2, retry_num=0) + self.assertEqual(retry, RetryPolicy.RETHROW) + + # retry BATCH_LOG writes regardless of received responses + retry, consistency = policy.on_write_timeout( + query=None, consistency="ONE", write_type=WriteType.BATCH_LOG, + required_responses=10000, received_responses=1, retry_num=0) + self.assertEqual(retry, RetryPolicy.RETRY) + self.assertEqual(consistency, "ONE") + + +class DowngradingConsistencyRetryPolicyTest(unittest.TestCase): + + def test_read_timeout(self): + policy = DowngradingConsistencyRetryPolicy() + + # if this is the second or greater attempt, rethrow + retry, consistency = policy.on_read_timeout( + query=None, consistency="ONE", required_responses=1, received_responses=2, + data_retrieved=True, retry_num=1) + self.assertEqual(retry, RetryPolicy.RETHROW) + + # if we didn't get enough responses, retry at a lower consistency + retry, consistency = policy.on_read_timeout( + query=None, consistency="ONE", required_responses=3, received_responses=2, + data_retrieved=True, retry_num=0) + self.assertEqual(retry, RetryPolicy.RETRY) + self.assertEqual(consistency, ConsistencyLevel.TWO) + + # retry consistency level goes down based on the # of recv'd responses + retry, consistency = policy.on_read_timeout( + query=None, consistency="ONE", required_responses=3, received_responses=1, + data_retrieved=True, retry_num=0) + self.assertEqual(retry, RetryPolicy.RETRY) + self.assertEqual(consistency, ConsistencyLevel.ONE) + + # if we got no responses, rethrow + retry, consistency = policy.on_read_timeout( + query=None, consistency="ONE", required_responses=3, received_responses=0, + data_retrieved=True, retry_num=0) + self.assertEqual(retry, RetryPolicy.RETHROW) + + # if we got enough response but no data, retry + retry, consistency = policy.on_read_timeout( + query=None, consistency="ONE", required_responses=3, received_responses=3, + data_retrieved=False, retry_num=0) + self.assertEqual(retry, RetryPolicy.RETRY) + + # if we got enough responses, but also got a data response, rethrow + retry, consistency = policy.on_read_timeout( + query=None, consistency="ONE", required_responses=2, received_responses=2, + data_retrieved=True, retry_num=0) + self.assertEqual(retry, RetryPolicy.RETHROW) + + def test_write_timeout(self): + policy = DowngradingConsistencyRetryPolicy() + + # if this is the second or greater attempt, rethrow + retry, consistency = policy.on_write_timeout( + query=None, consistency="ONE", write_type=WriteType.SIMPLE, + required_responses=1, received_responses=2, retry_num=1) + self.assertEqual(retry, RetryPolicy.RETHROW) + + # ignore failures on these types of writes + for write_type in (WriteType.SIMPLE, WriteType.BATCH, WriteType.COUNTER): + retry, consistency = policy.on_write_timeout( + query=None, consistency="ONE", write_type=write_type, + required_responses=1, received_responses=2, retry_num=0) + self.assertEqual(retry, RetryPolicy.IGNORE) + + # downgrade consistency level on unlogged batch writes + retry, consistency = policy.on_write_timeout( + query=None, consistency="ONE", write_type=WriteType.UNLOGGED_BATCH, + required_responses=3, received_responses=1, retry_num=0) + self.assertEqual(retry, RetryPolicy.RETRY) + self.assertEqual(consistency, ConsistencyLevel.ONE) + + # retry batch log writes at the same consistency level + retry, consistency = policy.on_write_timeout( + query=None, consistency="ONE", write_type=WriteType.BATCH_LOG, + required_responses=3, received_responses=1, retry_num=0) + self.assertEqual(retry, RetryPolicy.RETRY) + self.assertEqual(consistency, "ONE") + + def test_unavailable(self): + policy = DowngradingConsistencyRetryPolicy() + + # if this is the second or greater attempt, rethrow + retry, consistency = policy.on_unavailable( + query=None, consistency="ONE", required_replicas=3, alive_replicas=1, retry_num=1) + self.assertEqual(retry, RetryPolicy.RETHROW) + + # downgrade consistency on unavailable exceptions + retry, consistency = policy.on_unavailable( + query=None, consistency="ONE", required_replicas=3, alive_replicas=1, retry_num=0) + self.assertEqual(retry, RetryPolicy.RETRY) + self.assertEqual(consistency, ConsistencyLevel.ONE)