Also fix EAGAIN handling of large msgs for asyncore

This commit is contained in:
Tyler Hobbs
2014-02-21 12:11:11 -06:00
parent 2fd49c251f
commit cc1489c925
3 changed files with 48 additions and 2 deletions

View File

@@ -1,3 +1,14 @@
1.0.2
=====
In Progress
Bug Fixes
---------
* With asyncorereactor, correctly handle EAGAIN/EWOULDBLOCK when the message from
Cassandra is a multiple of the read buffer size. Previously, if no more data
became available to read on the socket, the message would never be processed,
resulting in an OperationTimedOut error.
1.0.1 1.0.1
===== =====
Feb 19, 2014 Feb 19, 2014

View File

@@ -260,7 +260,7 @@ class AsyncoreConnection(Connection, asyncore.dispatcher):
except socket.error as err: except socket.error as err:
if err.args[0] not in NONBLOCKING: if err.args[0] not in NONBLOCKING:
self.defunct(err) self.defunct(err)
return return
if self._iobuf.tell(): if self._iobuf.tell():
while True: while True:

View File

@@ -4,6 +4,7 @@ except ImportError:
import unittest # noqa import unittest # noqa
import errno import errno
import os
from StringIO import StringIO from StringIO import StringIO
import socket import socket
from socket import error as socket_error from socket import error as socket_error
@@ -16,7 +17,7 @@ from cassandra.connection import (PROTOCOL_VERSION,
from cassandra.decoder import (write_stringmultimap, write_int, write_string, from cassandra.decoder import (write_stringmultimap, write_int, write_string,
SupportedMessage, ReadyMessage, ServerError) SupportedMessage, ReadyMessage, ServerError)
from cassandra.marshal import uint8_pack, uint32_pack from cassandra.marshal import uint8_pack, uint32_pack, int32_pack
from cassandra.io.asyncorereactor import AsyncoreConnection from cassandra.io.asyncorereactor import AsyncoreConnection
@@ -85,6 +86,40 @@ class AsyncoreConnectionTest(unittest.TestCase):
c.handle_read() c.handle_read()
self.assertTrue(c.connected_event.is_set()) self.assertTrue(c.connected_event.is_set())
return c
def test_egain_on_buffer_size(self, *args):
# get a connection that's already fully started
c = self.test_successful_connection()
header = '\x00\x00\x00\x00' + int32_pack(20000)
responses = [
header + ('a' * (4096 - len(header))),
'a' * 4096,
socket_error(errno.EAGAIN),
'a' * 100,
socket_error(errno.EAGAIN)]
def side_effect(*args):
response = responses.pop(0)
if isinstance(response, socket_error):
raise response
else:
return response
c.socket.recv.side_effect = side_effect
c.handle_read()
self.assertEquals(c._total_reqd_bytes, 20000 + len(header))
# the EAGAIN prevents it from reading the last 100 bytes
c._iobuf.seek(0, os.SEEK_END)
pos = c._iobuf.tell()
self.assertEquals(pos, 4096 + 4096)
# now tell it to read the last 100 bytes
c.handle_read()
c._iobuf.seek(0, os.SEEK_END)
pos = c._iobuf.tell()
self.assertEquals(pos, 4096 + 4096 + 100)
def test_protocol_error(self, *args): def test_protocol_error(self, *args):
c = self.make_connection() c = self.make_connection()