Refactor KafkaClient._send_broker_aware_request to return a list of responses
and include individual (unraised) FailedPayloadsError instances rather than always raising a FailedPayloadsError. This should allow producers to determine which payloads succeeded and which failed, specifically.
This commit is contained in:
130
kafka/client.py
130
kafka/client.py
@@ -151,7 +151,7 @@ class KafkaClient(object):
|
||||
"""
|
||||
|
||||
# Group the requests by topic+partition
|
||||
original_keys = []
|
||||
brokers_for_payloads = []
|
||||
payloads_by_broker = collections.defaultdict(list)
|
||||
|
||||
for payload in payloads:
|
||||
@@ -159,67 +159,88 @@ class KafkaClient(object):
|
||||
payload.partition)
|
||||
|
||||
payloads_by_broker[leader].append(payload)
|
||||
original_keys.append((payload.topic, payload.partition))
|
||||
|
||||
# Accumulate the responses in a dictionary
|
||||
acc = {}
|
||||
|
||||
# keep a list of payloads that were failed to be sent to brokers
|
||||
failed_payloads = []
|
||||
brokers_for_payloads.append(leader)
|
||||
|
||||
# For each broker, send the list of request payloads
|
||||
# and collect the responses and errors
|
||||
responses_by_broker = collections.defaultdict(list)
|
||||
broker_failures = []
|
||||
for broker, payloads in payloads_by_broker.items():
|
||||
conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
|
||||
requestId = self._next_id()
|
||||
request = encoder_fn(client_id=self.client_id,
|
||||
correlation_id=requestId, payloads=payloads)
|
||||
|
||||
failed = False
|
||||
# Send the request, recv the response
|
||||
try:
|
||||
conn.send(requestId, request)
|
||||
|
||||
except ConnectionError as e:
|
||||
broker_failures.append(broker)
|
||||
log.warning("Could not send request [%s] to server %s: %s",
|
||||
binascii.b2a_hex(request), conn, e)
|
||||
|
||||
for payload in payloads:
|
||||
responses_by_broker[broker].append(FailedPayloadsError(payload))
|
||||
|
||||
# No exception, try to get response
|
||||
else:
|
||||
|
||||
# decoder_fn=None signal that the server is expected to not
|
||||
# send a response. This probably only applies to
|
||||
# ProduceRequest w/ acks = 0
|
||||
if decoder_fn is None:
|
||||
for payload in payloads:
|
||||
responses_by_broker[broker].append(None)
|
||||
continue
|
||||
|
||||
try:
|
||||
response = conn.recv(requestId)
|
||||
except ConnectionError as e:
|
||||
broker_failures.append(broker)
|
||||
log.warning("Could not receive response to request [%s] "
|
||||
"from server %s: %s", binascii.b2a_hex(request), conn, e)
|
||||
failed = True
|
||||
except ConnectionError as e:
|
||||
log.warning("Could not send request [%s] to server %s: %s",
|
||||
binascii.b2a_hex(request), conn, e)
|
||||
failed = True
|
||||
"from server %s: %s",
|
||||
binascii.b2a_hex(request), conn, e)
|
||||
|
||||
if failed:
|
||||
failed_payloads += payloads
|
||||
self.reset_all_metadata()
|
||||
continue
|
||||
for payload in payloads:
|
||||
responses_by_broker[broker].append(FailedPayloadsError(payload))
|
||||
|
||||
for response in decoder_fn(response):
|
||||
acc[(response.topic, response.partition)] = response
|
||||
else:
|
||||
|
||||
if failed_payloads:
|
||||
raise FailedPayloadsError(failed_payloads)
|
||||
for payload_response in decoder_fn(response):
|
||||
responses_by_broker[broker].append(payload_response)
|
||||
|
||||
# Order the accumulated responses by the original key order
|
||||
return (acc[k] for k in original_keys) if acc else ()
|
||||
# Connection errors generally mean stale metadata
|
||||
# although sometimes it means incorrect api request
|
||||
# Unfortunately there is no good way to tell the difference
|
||||
# so we'll just reset metadata on all errors to be safe
|
||||
if broker_failures:
|
||||
self.reset_all_metadata()
|
||||
|
||||
# Return responses in the same order as provided
|
||||
responses_by_payload = [responses_by_broker[broker].pop(0)
|
||||
for broker in brokers_for_payloads]
|
||||
return responses_by_payload
|
||||
|
||||
def __repr__(self):
|
||||
return '<KafkaClient client_id=%s>' % (self.client_id)
|
||||
|
||||
def _raise_on_response_error(self, resp):
|
||||
|
||||
# Response can be an unraised exception object (FailedPayloadsError)
|
||||
if isinstance(resp, Exception):
|
||||
raise resp
|
||||
|
||||
# Or a server api error response
|
||||
try:
|
||||
kafka.common.check_error(resp)
|
||||
except (UnknownTopicOrPartitionError, NotLeaderForPartitionError):
|
||||
self.reset_topic_metadata(resp.topic)
|
||||
raise
|
||||
|
||||
# Return False if no error to enable list comprehensions
|
||||
return False
|
||||
|
||||
#################
|
||||
# Public API #
|
||||
#################
|
||||
@@ -419,16 +440,9 @@ class KafkaClient(object):
|
||||
|
||||
resps = self._send_broker_aware_request(payloads, encoder, decoder)
|
||||
|
||||
out = []
|
||||
for resp in resps:
|
||||
if fail_on_error is True:
|
||||
self._raise_on_response_error(resp)
|
||||
|
||||
if callback is not None:
|
||||
out.append(callback(resp))
|
||||
else:
|
||||
out.append(resp)
|
||||
return out
|
||||
return [resp if not callback else callback(resp) for resp in resps
|
||||
if resp is not None and
|
||||
(not fail_on_error or not self._raise_on_response_error(resp))]
|
||||
|
||||
def send_fetch_request(self, payloads=[], fail_on_error=True,
|
||||
callback=None, max_wait_time=100, min_bytes=4096):
|
||||
@@ -447,16 +461,8 @@ class KafkaClient(object):
|
||||
payloads, encoder,
|
||||
KafkaProtocol.decode_fetch_response)
|
||||
|
||||
out = []
|
||||
for resp in resps:
|
||||
if fail_on_error is True:
|
||||
self._raise_on_response_error(resp)
|
||||
|
||||
if callback is not None:
|
||||
out.append(callback(resp))
|
||||
else:
|
||||
out.append(resp)
|
||||
return out
|
||||
return [resp if not callback else callback(resp) for resp in resps
|
||||
if not fail_on_error or not self._raise_on_response_error(resp)]
|
||||
|
||||
def send_offset_request(self, payloads=[], fail_on_error=True,
|
||||
callback=None):
|
||||
@@ -465,15 +471,8 @@ class KafkaClient(object):
|
||||
KafkaProtocol.encode_offset_request,
|
||||
KafkaProtocol.decode_offset_response)
|
||||
|
||||
out = []
|
||||
for resp in resps:
|
||||
if fail_on_error is True:
|
||||
self._raise_on_response_error(resp)
|
||||
if callback is not None:
|
||||
out.append(callback(resp))
|
||||
else:
|
||||
out.append(resp)
|
||||
return out
|
||||
return [resp if not callback else callback(resp) for resp in resps
|
||||
if not fail_on_error or not self._raise_on_response_error(resp)]
|
||||
|
||||
def send_offset_commit_request(self, group, payloads=[],
|
||||
fail_on_error=True, callback=None):
|
||||
@@ -482,16 +481,8 @@ class KafkaClient(object):
|
||||
decoder = KafkaProtocol.decode_offset_commit_response
|
||||
resps = self._send_broker_aware_request(payloads, encoder, decoder)
|
||||
|
||||
out = []
|
||||
for resp in resps:
|
||||
if fail_on_error is True:
|
||||
self._raise_on_response_error(resp)
|
||||
|
||||
if callback is not None:
|
||||
out.append(callback(resp))
|
||||
else:
|
||||
out.append(resp)
|
||||
return out
|
||||
return [resp if not callback else callback(resp) for resp in resps
|
||||
if not fail_on_error or not self._raise_on_response_error(resp)]
|
||||
|
||||
def send_offset_fetch_request(self, group, payloads=[],
|
||||
fail_on_error=True, callback=None):
|
||||
@@ -501,12 +492,5 @@ class KafkaClient(object):
|
||||
decoder = KafkaProtocol.decode_offset_fetch_response
|
||||
resps = self._send_broker_aware_request(payloads, encoder, decoder)
|
||||
|
||||
out = []
|
||||
for resp in resps:
|
||||
if fail_on_error is True:
|
||||
self._raise_on_response_error(resp)
|
||||
if callback is not None:
|
||||
out.append(callback(resp))
|
||||
else:
|
||||
out.append(resp)
|
||||
return out
|
||||
return [resp if not callback else callback(resp) for resp in resps
|
||||
if not fail_on_error or not self._raise_on_response_error(resp)]
|
||||
|
||||
Reference in New Issue
Block a user