Merge pull request #611 from dpkp/sock_send_bytes

Handle partial socket send()
This commit is contained in:
Dana Powers
2016-04-05 08:10:00 -07:00
3 changed files with 110 additions and 15 deletions

View File

@@ -188,10 +188,12 @@ class BrokerConnection(object):
# and send bytes asynchronously. For now, just block
# sending each request payload
self._sock.setblocking(True)
sent_bytes = self._sock.send(size)
assert sent_bytes == len(size)
sent_bytes = self._sock.send(message)
assert sent_bytes == len(message)
for data in (size, message):
total_sent = 0
while total_sent < len(data):
sent_bytes = self._sock.send(data[total_sent:])
total_sent += sent_bytes
assert total_sent == len(data)
self._sock.setblocking(False)
except (AssertionError, ConnectionError) as e:
log.exception("Error sending %s to %s", request, self)

View File

@@ -15,10 +15,10 @@ class Future(object):
self._errbacks = []
def succeeded(self):
return self.is_done and not self.exception
return self.is_done and not bool(self.exception)
def failed(self):
return self.is_done and self.exception
return self.is_done and bool(self.exception)
def retriable(self):
try:

View File

@@ -2,12 +2,15 @@
from __future__ import absolute_import
from errno import EALREADY, EINPROGRESS, EISCONN, ECONNRESET
import socket
import time
import pytest
from kafka.conn import BrokerConnection, ConnectionStates
from kafka.protocol.api import RequestHeader
from kafka.protocol.metadata import MetadataRequest
import kafka.common as Errors
@pytest.fixture
@@ -20,6 +23,7 @@ def socket(mocker):
@pytest.fixture
def conn(socket):
from socket import AF_INET
conn = BrokerConnection('localhost', 9092, socket.AF_INET)
return conn
@@ -61,22 +65,111 @@ def test_connect_timeout(socket, conn):
def test_blacked_out(conn):
assert not conn.blacked_out()
assert conn.blacked_out() is False
conn.last_attempt = time.time()
assert conn.blacked_out()
assert conn.blacked_out() is True
def test_connected(conn):
assert not conn.connected()
assert conn.connected() is False
conn.state = ConnectionStates.CONNECTED
assert conn.connected()
assert conn.connected() is True
def test_connecting(conn):
assert not conn.connecting()
assert conn.connecting() is False
conn.state = ConnectionStates.CONNECTING
assert conn.connecting()
assert conn.connecting() is True
conn.state = ConnectionStates.CONNECTED
assert not conn.connecting()
assert conn.connecting() is False
# TODO: test_send, test_recv, test_can_send_more, test_close
def test_send_disconnected(conn):
conn.state = ConnectionStates.DISCONNECTED
f = conn.send('foobar')
assert f.failed() is True
assert isinstance(f.exception, Errors.ConnectionError)
def test_send_connecting(conn):
conn.state = ConnectionStates.CONNECTING
f = conn.send('foobar')
assert f.failed() is True
assert isinstance(f.exception, Errors.NodeNotReadyError)
def test_send_max_ifr(conn):
conn.state = ConnectionStates.CONNECTED
max_ifrs = conn.config['max_in_flight_requests_per_connection']
for _ in range(max_ifrs):
conn.in_flight_requests.append('foo')
f = conn.send('foobar')
assert f.failed() is True
assert isinstance(f.exception, Errors.TooManyInFlightRequests)
def test_send_no_response(socket, conn):
conn.connect()
assert conn.state is ConnectionStates.CONNECTED
req = MetadataRequest([])
header = RequestHeader(req, client_id=conn.config['client_id'])
payload_bytes = len(header.encode()) + len(req.encode())
third = payload_bytes // 3
remainder = payload_bytes % 3
socket.send.side_effect = [4, third, third, third, remainder]
assert len(conn.in_flight_requests) == 0
f = conn.send(req, expect_response=False)
assert f.succeeded() is True
assert f.value is None
assert len(conn.in_flight_requests) == 0
def test_send_response(socket, conn):
conn.connect()
assert conn.state is ConnectionStates.CONNECTED
req = MetadataRequest([])
header = RequestHeader(req, client_id=conn.config['client_id'])
payload_bytes = len(header.encode()) + len(req.encode())
third = payload_bytes // 3
remainder = payload_bytes % 3
socket.send.side_effect = [4, third, third, third, remainder]
assert len(conn.in_flight_requests) == 0
f = conn.send(req)
assert f.is_done is False
assert len(conn.in_flight_requests) == 1
def test_send_error(socket, conn):
conn.connect()
assert conn.state is ConnectionStates.CONNECTED
req = MetadataRequest([])
header = RequestHeader(req, client_id=conn.config['client_id'])
try:
error = ConnectionError
except NameError:
from socket import error
socket.send.side_effect = error
f = conn.send(req)
assert f.failed() is True
assert isinstance(f.exception, Errors.ConnectionError)
assert socket.close.call_count == 1
assert conn.state is ConnectionStates.DISCONNECTED
def test_can_send_more(conn):
assert conn.can_send_more() is True
max_ifrs = conn.config['max_in_flight_requests_per_connection']
for _ in range(max_ifrs):
assert conn.can_send_more() is True
conn.in_flight_requests.append('foo')
assert conn.can_send_more() is False
def test_recv(socket, conn):
pass # TODO
def test_close(conn):
pass # TODO