diff --git a/CHANGELOG.rst b/CHANGELOG.rst index b5ab287b..5ee880bb 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,14 @@ +1.0.2 +===== +In Progress + +Bug Fixes +--------- +* With asyncorereactor, correctly handle EAGAIN/EWOULDBLOCK when the message from + Cassandra is a multiple of the read buffer size. Previously, if no more data + became available to read on the socket, the message would never be processed, + resulting in an OperationTimedOut error. + 1.0.1 ===== Feb 19, 2014 diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index 6fefa49d..6f80e48a 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -260,7 +260,7 @@ class AsyncoreConnection(Connection, asyncore.dispatcher): except socket.error as err: if err.args[0] not in NONBLOCKING: self.defunct(err) - return + return if self._iobuf.tell(): while True: diff --git a/tests/unit/io/test_asyncorereactor.py b/tests/unit/io/test_asyncorereactor.py index a8eaf5c3..8b5db2e1 100644 --- a/tests/unit/io/test_asyncorereactor.py +++ b/tests/unit/io/test_asyncorereactor.py @@ -4,6 +4,7 @@ except ImportError: import unittest # noqa import errno +import os from StringIO import StringIO import socket from socket import error as socket_error @@ -16,7 +17,7 @@ from cassandra.connection import (PROTOCOL_VERSION, from cassandra.decoder import (write_stringmultimap, write_int, write_string, SupportedMessage, ReadyMessage, ServerError) -from cassandra.marshal import uint8_pack, uint32_pack +from cassandra.marshal import uint8_pack, uint32_pack, int32_pack from cassandra.io.asyncorereactor import AsyncoreConnection @@ -85,6 +86,40 @@ class AsyncoreConnectionTest(unittest.TestCase): c.handle_read() self.assertTrue(c.connected_event.is_set()) + return c + + def test_egain_on_buffer_size(self, *args): + # get a connection that's already fully started + c = self.test_successful_connection() + + header = '\x00\x00\x00\x00' + int32_pack(20000) + responses = [ + header + ('a' * (4096 - len(header))), + 'a' * 4096, + socket_error(errno.EAGAIN), + 'a' * 100, + socket_error(errno.EAGAIN)] + + def side_effect(*args): + response = responses.pop(0) + if isinstance(response, socket_error): + raise response + else: + return response + + c.socket.recv.side_effect = side_effect + c.handle_read() + self.assertEquals(c._total_reqd_bytes, 20000 + len(header)) + # the EAGAIN prevents it from reading the last 100 bytes + c._iobuf.seek(0, os.SEEK_END) + pos = c._iobuf.tell() + self.assertEquals(pos, 4096 + 4096) + + # now tell it to read the last 100 bytes + c.handle_read() + c._iobuf.seek(0, os.SEEK_END) + pos = c._iobuf.tell() + self.assertEquals(pos, 4096 + 4096 + 100) def test_protocol_error(self, *args): c = self.make_connection()