Unit tests for PreparedQueryNotFound handling, related fixes
This commit is contained in:
@@ -1170,6 +1170,20 @@ class ResponseFuture(object):
|
||||
... print "operation failed!"
|
||||
|
||||
"""
|
||||
session = None
|
||||
row_factory = None
|
||||
message = None
|
||||
query = None
|
||||
|
||||
_req_id = None
|
||||
_final_result = _NO_RESULT_YET
|
||||
_final_exception = None
|
||||
_callback = None
|
||||
_errback = None
|
||||
_current_host = None
|
||||
_current_pool = None
|
||||
_connection = None
|
||||
_query_retries = 0
|
||||
|
||||
def __init__(self, session, message, query):
|
||||
self.session = session
|
||||
@@ -1182,14 +1196,8 @@ class ResponseFuture(object):
|
||||
# they last left off
|
||||
self.query_plan = iter(session._load_balancer.make_query_plan(query))
|
||||
|
||||
self._req_id = None
|
||||
self._final_result = _NO_RESULT_YET
|
||||
self._final_exception = None
|
||||
self._current_host = self._current_pool = self._connection = None
|
||||
self._event = Event()
|
||||
self._errors = {}
|
||||
self._query_retries = 0
|
||||
self._callback = self._errback = None
|
||||
|
||||
def __del__(self):
|
||||
if hasattr(self, 'session'):
|
||||
@@ -1282,12 +1290,13 @@ class ResponseFuture(object):
|
||||
self._retry(reuse_connection=False, consistency_level=None)
|
||||
return
|
||||
elif isinstance(response, PreparedQueryNotFound):
|
||||
query_id = response.result
|
||||
md5_id = response.info
|
||||
try:
|
||||
prepared_statement = self.session.cluster._prepared_statements[query_id]
|
||||
prepared_statement = self.session.cluster._prepared_statements[md5_id]
|
||||
except KeyError:
|
||||
log.error("Tried to execute unknown prepared statement %d" % (query_id,))
|
||||
log.error("Tried to execute unknown prepared statement %d" % (md5_id,))
|
||||
self._set_final_exception(response)
|
||||
return
|
||||
|
||||
current_keyspace = self._connection.keyspace
|
||||
prepared_keyspace = prepared_statement.keyspace
|
||||
@@ -1297,13 +1306,15 @@ class ResponseFuture(object):
|
||||
"not match the keyspace the statement was "
|
||||
"prepared with (%s)" %
|
||||
(current_keyspace, prepared_keyspace)))
|
||||
return
|
||||
|
||||
prepare_message = PrepareMessage(query=self.query.query_string)
|
||||
prepare_message = PrepareMessage(query=prepared_statement.query_string)
|
||||
# since this might block, run on the executor to avoid hanging
|
||||
# the event loop thread
|
||||
self.session.submit(self._connection.send_msg,
|
||||
prepare_message,
|
||||
cb=self._execute_after_prepare)
|
||||
return
|
||||
else:
|
||||
self._set_final_exception(response)
|
||||
return
|
||||
|
@@ -278,8 +278,8 @@ class PreparedQueryNotFound(RequestValidationException):
|
||||
|
||||
@staticmethod
|
||||
def recv_error_info(f):
|
||||
# return the prepared query ID
|
||||
return read_short(f)
|
||||
# return the MD5 ID for the prepared query
|
||||
return f.read(16)
|
||||
|
||||
class AlreadyExistsException(ConfigurationException):
|
||||
summary = 'Item already exists'
|
||||
|
@@ -1,12 +1,13 @@
|
||||
import unittest
|
||||
from mock import Mock, ANY
|
||||
from mock import Mock, MagicMock, ANY
|
||||
|
||||
from cassandra import ConsistencyLevel
|
||||
from cassandra.cluster import Session, ResponseFuture, NoHostAvailable
|
||||
from cassandra.connection import ConnectionException
|
||||
from cassandra.decoder import (ReadTimeoutErrorMessage, WriteTimeoutErrorMessage,
|
||||
UnavailableErrorMessage, ResultMessage, QueryMessage,
|
||||
OverloadedErrorMessage, IsBootstrappingErrorMessage)
|
||||
OverloadedErrorMessage, IsBootstrappingErrorMessage,
|
||||
PreparedQueryNotFound, PrepareMessage)
|
||||
from cassandra.policies import RetryPolicy
|
||||
from cassandra.pool import NoConnectionsAvailable
|
||||
from cassandra.query import SimpleStatement
|
||||
@@ -344,3 +345,37 @@ class ResponseFutureTests(unittest.TestCase):
|
||||
response = Mock(spec=ResultMessage, kind=ResultMessage.KIND_ROWS, results=[{'col': 'val'}])
|
||||
rf._set_result(response)
|
||||
self.assertEqual(rf.result(), [{'col': 'val'}])
|
||||
|
||||
def test_prepared_query_not_found(self):
|
||||
session = self.make_session()
|
||||
rf = self.make_response_future(session)
|
||||
rf.send_request()
|
||||
|
||||
session.cluster._prepared_statements = MagicMock(dict)
|
||||
prepared_statement = session.cluster._prepared_statements.__getitem__.return_value
|
||||
prepared_statement.query_string = "SELECT * FROM foobar"
|
||||
prepared_statement.keyspace = "FooKeyspace"
|
||||
rf._connection.keyspace = "FooKeyspace"
|
||||
|
||||
result = Mock(spec=PreparedQueryNotFound, info='a' * 16)
|
||||
rf._set_result(result)
|
||||
|
||||
session.submit.assert_called_once()
|
||||
args, kwargs = session.submit.call_args
|
||||
self.assertIsInstance(args[-1], PrepareMessage)
|
||||
self.assertEquals(args[-1].query, "SELECT * FROM foobar")
|
||||
|
||||
def test_prepared_query_not_found_bad_keyspace(self):
|
||||
session = self.make_session()
|
||||
rf = self.make_response_future(session)
|
||||
rf.send_request()
|
||||
|
||||
session.cluster._prepared_statements = MagicMock(dict)
|
||||
prepared_statement = session.cluster._prepared_statements.__getitem__.return_value
|
||||
prepared_statement.query_string = "SELECT * FROM foobar"
|
||||
prepared_statement.keyspace = "FooKeyspace"
|
||||
rf._connection.keyspace = "BarKeyspace"
|
||||
|
||||
result = Mock(spec=PreparedQueryNotFound, info='a' * 16)
|
||||
rf._set_result(result)
|
||||
self.assertRaises(ValueError, rf.result)
|
||||
|
Reference in New Issue
Block a user