Add tests. Bug fix. Rename socket_conn dict.
This commit is contained in:

committed by
Enrico Canzonieri

parent
04920bb89f
commit
c2adeeab05
@@ -179,9 +179,9 @@ class KafkaClient(object):
|
|||||||
# and collect the responses and errors
|
# and collect the responses and errors
|
||||||
broker_failures = []
|
broker_failures = []
|
||||||
|
|
||||||
# For each KafkaConnection we store the real socket so that we can use
|
# For each KafkaConnection keep the real socket so that we can use
|
||||||
# a select to perform unblocking I/O
|
# a select to perform unblocking I/O
|
||||||
socket_connection = {}
|
connections_by_socket = {}
|
||||||
for broker, payloads in payloads_by_broker.items():
|
for broker, payloads in payloads_by_broker.items():
|
||||||
requestId = self._next_id()
|
requestId = self._next_id()
|
||||||
log.debug('Request %s to %s: %s', requestId, broker, payloads)
|
log.debug('Request %s to %s: %s', requestId, broker, payloads)
|
||||||
@@ -216,13 +216,13 @@ class KafkaClient(object):
|
|||||||
responses[topic_partition] = None
|
responses[topic_partition] = None
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
socket_connection[conn.get_connected_socket()] = (conn, broker)
|
connections_by_socket[conn.get_connected_socket()] = (conn, broker)
|
||||||
|
|
||||||
conn = None
|
conn = None
|
||||||
while socket_connection:
|
while connections_by_socket:
|
||||||
sockets = socket_connection.keys()
|
sockets = connections_by_socket.keys()
|
||||||
rlist, _, _ = select.select(sockets, [], [], None)
|
rlist, _, _ = select.select(sockets, [], [], None)
|
||||||
conn, broker = socket_connection.pop(rlist[0])
|
conn, broker = connections_by_socket.pop(rlist[0])
|
||||||
try:
|
try:
|
||||||
response = conn.recv(requestId)
|
response = conn.recv(requestId)
|
||||||
except ConnectionError as e:
|
except ConnectionError as e:
|
||||||
@@ -231,7 +231,7 @@ class KafkaClient(object):
|
|||||||
'response to request %s from server %s: %s',
|
'response to request %s from server %s: %s',
|
||||||
requestId, broker, e)
|
requestId, broker, e)
|
||||||
|
|
||||||
for payload in payloads:
|
for payload in payloads_by_broker[broker]:
|
||||||
topic_partition = (payload.topic, payload.partition)
|
topic_partition = (payload.topic, payload.partition)
|
||||||
responses[topic_partition] = FailedPayloadsError(payload)
|
responses[topic_partition] = FailedPayloadsError(payload)
|
||||||
|
|
||||||
|
@@ -165,6 +165,23 @@ class ConnTest(unittest.TestCase):
|
|||||||
self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload'])
|
self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload'])
|
||||||
self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload2'])
|
self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload2'])
|
||||||
|
|
||||||
|
def test_get_connected_socket(self):
|
||||||
|
s = self.conn.get_connected_socket()
|
||||||
|
|
||||||
|
self.assertEqual(s, self.MockCreateConn())
|
||||||
|
|
||||||
|
def test_get_connected_socket_on_dirty_conn(self):
|
||||||
|
# Dirty the connection
|
||||||
|
try:
|
||||||
|
self.conn._raise_connection_error()
|
||||||
|
except ConnectionError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Test that get_connected_socket tries to connect
|
||||||
|
self.assertEqual(self.MockCreateConn.call_count, 0)
|
||||||
|
self.conn.get_connected_socket()
|
||||||
|
self.assertEqual(self.MockCreateConn.call_count, 1)
|
||||||
|
|
||||||
def test_close__object_is_reusable(self):
|
def test_close__object_is_reusable(self):
|
||||||
|
|
||||||
# test that sending to a closed connection
|
# test that sending to a closed connection
|
||||||
|
Reference in New Issue
Block a user