From ec95133ef43ea6399bac102ba3ba7e614ece5a23 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Tue, 18 Feb 2014 15:07:43 -0600 Subject: [PATCH] libev: handle EAGAIN when message len matches buffer size --- CHANGELOG.rst | 4 ++++ cassandra/io/libevreactor.py | 2 +- tests/unit/io/test_libevreactor.py | 37 +++++++++++++++++++++++++++++- 3 files changed, 41 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 3875521a..32784b5b 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -9,6 +9,10 @@ Bug Fixes * Always close socket when defuncting error'ed connections to avoid a potential file descriptor leak * Handle "custom" types (such as the replaced DateType) correctly +* With libevreactor, correctly handle EAGAIN/EWOULDBLOCK when the message from + Cassandra is a multiple of the read buffer size. Previously, if no more data + became available to read on the socket, the message would never be processed, + resulting in an OperationTimedOut error. Other ----- diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index 35fc1771..3d298fb8 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -313,7 +313,7 @@ class LibevConnection(Connection): except socket.error as err: if err.args[0] not in NONBLOCKING: self.defunct(err) - return + return if self._iobuf.tell(): while True: diff --git a/tests/unit/io/test_libevreactor.py b/tests/unit/io/test_libevreactor.py index 875e3610..f7fab9fd 100644 --- a/tests/unit/io/test_libevreactor.py +++ b/tests/unit/io/test_libevreactor.py @@ -4,6 +4,7 @@ except ImportError: import unittest # noqa import errno +import os from StringIO import StringIO from socket import error as socket_error @@ -15,7 +16,7 @@ from cassandra.connection import (PROTOCOL_VERSION, from cassandra.decoder import (write_stringmultimap, write_int, write_string, SupportedMessage, ReadyMessage, ServerError) -from cassandra.marshal import uint8_pack, uint32_pack +from cassandra.marshal import uint8_pack, uint32_pack, int32_pack try: from cassandra.io.libevreactor import LibevConnection @@ -85,6 +86,40 @@ class LibevConnectionTest(unittest.TestCase): c.handle_read(None, 0) self.assertTrue(c.connected_event.is_set()) + return c + + def test_egain_on_buffer_size(self, *args): + # get a connection that's already fully started + c = self.test_successful_connection() + + header = '\x00\x00\x00\x00' + int32_pack(20000) + responses = [ + header + ('a' * (4096 - len(header))), + 'a' * 4096, + socket_error(errno.EAGAIN), + 'a' * 100, + socket_error(errno.EAGAIN)] + + def side_effect(*args): + response = responses.pop(0) + if isinstance(response, socket_error): + raise response + else: + return response + + c._socket.recv.side_effect = side_effect + c.handle_read(None, 0) + self.assertEquals(c._total_reqd_bytes, 20000 + len(header)) + # the EAGAIN prevents it from reading the last 100 bytes + c._iobuf.seek(0, os.SEEK_END) + pos = c._iobuf.tell() + self.assertEquals(pos, 4096 + 4096) + + # now tell it to read the last 100 bytes + c.handle_read(None, 0) + c._iobuf.seek(0, os.SEEK_END) + pos = c._iobuf.tell() + self.assertEquals(pos, 4096 + 4096 + 100) def test_protocol_error(self, *args): c = self.make_connection()