Unblocking broker aware request
This commit is contained in:

committed by
Enrico Canzonieri

parent
e99a934bab
commit
04920bb89f
@@ -2,6 +2,7 @@ import collections
|
||||
import copy
|
||||
import functools
|
||||
import logging
|
||||
import select
|
||||
import time
|
||||
|
||||
import kafka.common
|
||||
@@ -177,6 +178,10 @@ class KafkaClient(object):
|
||||
# For each broker, send the list of request payloads
|
||||
# and collect the responses and errors
|
||||
broker_failures = []
|
||||
|
||||
# For each KafkaConnection we store the real socket so that we can use
|
||||
# a select to perform unblocking I/O
|
||||
socket_connection = {}
|
||||
for broker, payloads in payloads_by_broker.items():
|
||||
requestId = self._next_id()
|
||||
log.debug('Request %s to %s: %s', requestId, broker, payloads)
|
||||
@@ -210,27 +215,34 @@ class KafkaClient(object):
|
||||
topic_partition = (payload.topic, payload.partition)
|
||||
responses[topic_partition] = None
|
||||
continue
|
||||
|
||||
try:
|
||||
response = conn.recv(requestId)
|
||||
except ConnectionError as e:
|
||||
broker_failures.append(broker)
|
||||
log.warning('ConnectionError attempting to receive a '
|
||||
'response to request %s from server %s: %s',
|
||||
requestId, broker, e)
|
||||
|
||||
for payload in payloads:
|
||||
topic_partition = (payload.topic, payload.partition)
|
||||
responses[topic_partition] = FailedPayloadsError(payload)
|
||||
|
||||
else:
|
||||
_resps = []
|
||||
for payload_response in decoder_fn(response):
|
||||
topic_partition = (payload_response.topic,
|
||||
payload_response.partition)
|
||||
responses[topic_partition] = payload_response
|
||||
_resps.append(payload_response)
|
||||
log.debug('Response %s: %s', requestId, _resps)
|
||||
socket_connection[conn.get_connected_socket()] = (conn, broker)
|
||||
|
||||
conn = None
|
||||
while socket_connection:
|
||||
sockets = socket_connection.keys()
|
||||
rlist, _, _ = select.select(sockets, [], [], None)
|
||||
conn, broker = socket_connection.pop(rlist[0])
|
||||
try:
|
||||
response = conn.recv(requestId)
|
||||
except ConnectionError as e:
|
||||
broker_failures.append(broker)
|
||||
log.warning('ConnectionError attempting to receive a '
|
||||
'response to request %s from server %s: %s',
|
||||
requestId, broker, e)
|
||||
|
||||
for payload in payloads:
|
||||
topic_partition = (payload.topic, payload.partition)
|
||||
responses[topic_partition] = FailedPayloadsError(payload)
|
||||
|
||||
else:
|
||||
_resps = []
|
||||
for payload_response in decoder_fn(response):
|
||||
topic_partition = (payload_response.topic,
|
||||
payload_response.partition)
|
||||
responses[topic_partition] = payload_response
|
||||
_resps.append(payload_response)
|
||||
log.debug('Response %s: %s', requestId, _resps)
|
||||
|
||||
# Connection errors generally mean stale metadata
|
||||
# although sometimes it means incorrect api request
|
||||
|
@@ -118,6 +118,11 @@ class KafkaConnection(local):
|
||||
|
||||
# TODO multiplex socket communication to allow for multi-threaded clients
|
||||
|
||||
def get_connected_socket(self):
|
||||
if not self._sock:
|
||||
self.reinit()
|
||||
return self._sock
|
||||
|
||||
def send(self, request_id, payload):
|
||||
"""
|
||||
Send a request to Kafka
|
||||
|
Reference in New Issue
Block a user