diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 0b78c533..14a74d51 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1170,6 +1170,20 @@ class ResponseFuture(object): ... print "operation failed!" """ + session = None + row_factory = None + message = None + query = None + + _req_id = None + _final_result = _NO_RESULT_YET + _final_exception = None + _callback = None + _errback = None + _current_host = None + _current_pool = None + _connection = None + _query_retries = 0 def __init__(self, session, message, query): self.session = session @@ -1182,14 +1196,8 @@ class ResponseFuture(object): # they last left off self.query_plan = iter(session._load_balancer.make_query_plan(query)) - self._req_id = None - self._final_result = _NO_RESULT_YET - self._final_exception = None - self._current_host = self._current_pool = self._connection = None self._event = Event() self._errors = {} - self._query_retries = 0 - self._callback = self._errback = None def __del__(self): if hasattr(self, 'session'): @@ -1282,12 +1290,13 @@ class ResponseFuture(object): self._retry(reuse_connection=False, consistency_level=None) return elif isinstance(response, PreparedQueryNotFound): - query_id = response.result + md5_id = response.info try: - prepared_statement = self.session.cluster._prepared_statements[query_id] + prepared_statement = self.session.cluster._prepared_statements[md5_id] except KeyError: - log.error("Tried to execute unknown prepared statement %d" % (query_id,)) + log.error("Tried to execute unknown prepared statement %d" % (md5_id,)) self._set_final_exception(response) + return current_keyspace = self._connection.keyspace prepared_keyspace = prepared_statement.keyspace @@ -1297,13 +1306,15 @@ class ResponseFuture(object): "not match the keyspace the statement was " "prepared with (%s)" % (current_keyspace, prepared_keyspace))) + return - prepare_message = PrepareMessage(query=self.query.query_string) + prepare_message = PrepareMessage(query=prepared_statement.query_string) # since this might block, run on the executor to avoid hanging # the event loop thread self.session.submit(self._connection.send_msg, prepare_message, cb=self._execute_after_prepare) + return else: self._set_final_exception(response) return diff --git a/cassandra/decoder.py b/cassandra/decoder.py index 515f0874..e52f7456 100644 --- a/cassandra/decoder.py +++ b/cassandra/decoder.py @@ -278,8 +278,8 @@ class PreparedQueryNotFound(RequestValidationException): @staticmethod def recv_error_info(f): - # return the prepared query ID - return read_short(f) + # return the MD5 ID for the prepared query + return f.read(16) class AlreadyExistsException(ConfigurationException): summary = 'Item already exists' diff --git a/tests/unit/test_response_future.py b/tests/unit/test_response_future.py index 5d2cbbb8..bc1e0fa4 100644 --- a/tests/unit/test_response_future.py +++ b/tests/unit/test_response_future.py @@ -1,12 +1,13 @@ import unittest -from mock import Mock, ANY +from mock import Mock, MagicMock, ANY from cassandra import ConsistencyLevel from cassandra.cluster import Session, ResponseFuture, NoHostAvailable from cassandra.connection import ConnectionException from cassandra.decoder import (ReadTimeoutErrorMessage, WriteTimeoutErrorMessage, UnavailableErrorMessage, ResultMessage, QueryMessage, - OverloadedErrorMessage, IsBootstrappingErrorMessage) + OverloadedErrorMessage, IsBootstrappingErrorMessage, + PreparedQueryNotFound, PrepareMessage) from cassandra.policies import RetryPolicy from cassandra.pool import NoConnectionsAvailable from cassandra.query import SimpleStatement @@ -344,3 +345,37 @@ class ResponseFutureTests(unittest.TestCase): response = Mock(spec=ResultMessage, kind=ResultMessage.KIND_ROWS, results=[{'col': 'val'}]) rf._set_result(response) self.assertEqual(rf.result(), [{'col': 'val'}]) + + def test_prepared_query_not_found(self): + session = self.make_session() + rf = self.make_response_future(session) + rf.send_request() + + session.cluster._prepared_statements = MagicMock(dict) + prepared_statement = session.cluster._prepared_statements.__getitem__.return_value + prepared_statement.query_string = "SELECT * FROM foobar" + prepared_statement.keyspace = "FooKeyspace" + rf._connection.keyspace = "FooKeyspace" + + result = Mock(spec=PreparedQueryNotFound, info='a' * 16) + rf._set_result(result) + + session.submit.assert_called_once() + args, kwargs = session.submit.call_args + self.assertIsInstance(args[-1], PrepareMessage) + self.assertEquals(args[-1].query, "SELECT * FROM foobar") + + def test_prepared_query_not_found_bad_keyspace(self): + session = self.make_session() + rf = self.make_response_future(session) + rf.send_request() + + session.cluster._prepared_statements = MagicMock(dict) + prepared_statement = session.cluster._prepared_statements.__getitem__.return_value + prepared_statement.query_string = "SELECT * FROM foobar" + prepared_statement.keyspace = "FooKeyspace" + rf._connection.keyspace = "BarKeyspace" + + result = Mock(spec=PreparedQueryNotFound, info='a' * 16) + rf._set_result(result) + self.assertRaises(ValueError, rf.result)