Check response.error for async producer
This commit is contained in:
@@ -16,8 +16,8 @@ import six
|
|||||||
|
|
||||||
from kafka.common import (
|
from kafka.common import (
|
||||||
ProduceRequest, TopicAndPartition, RetryOptions,
|
ProduceRequest, TopicAndPartition, RetryOptions,
|
||||||
UnsupportedCodecError, FailedPayloadsError,
|
kafka_errors, UnsupportedCodecError, FailedPayloadsError,
|
||||||
RequestTimedOutError, AsyncProducerQueueFull
|
RequestTimedOutError, AsyncProducerQueueFull, UnknownError
|
||||||
)
|
)
|
||||||
from kafka.common import (
|
from kafka.common import (
|
||||||
RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)
|
RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)
|
||||||
@@ -89,41 +89,46 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
|
|||||||
if not reqs:
|
if not reqs:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
reqs_to_retry, error_type = [], None
|
reqs_to_retry, error_cls = [], None
|
||||||
|
do_backoff, do_refresh = False, False
|
||||||
|
|
||||||
|
def _handle_error(error_cls, reqs, all_retries):
|
||||||
|
if ((error_cls == RequestTimedOutError and
|
||||||
|
retry_options.retry_on_timeouts) or
|
||||||
|
error_cls in RETRY_ERROR_TYPES):
|
||||||
|
all_retries += reqs
|
||||||
|
if error_cls in RETRY_BACKOFF_ERROR_TYPES:
|
||||||
|
do_backoff = True
|
||||||
|
if error_cls in RETRY_REFRESH_ERROR_TYPES:
|
||||||
|
do_refresh = True
|
||||||
|
|
||||||
try:
|
try:
|
||||||
reply = client.send_produce_request(reqs.keys(),
|
reply = client.send_produce_request(reqs.keys(),
|
||||||
acks=req_acks,
|
acks=req_acks,
|
||||||
timeout=ack_timeout,
|
timeout=ack_timeout,
|
||||||
fail_on_error=False)
|
fail_on_error=False)
|
||||||
reqs_to_retry = [req for broker_responses in reply
|
for i, response in enumerate(reply):
|
||||||
for response in broker_responses
|
if isinstance(response, FailedPayloadsError):
|
||||||
for req in response.failed_payloads
|
_handle_error(FailedPayloadsError, response.failed_payloads, reqs_to_retry)
|
||||||
if isinstance(response, FailedPayloadsError)]
|
elif isinstance(response, ProduceResponse) and response.error:
|
||||||
if reqs_to_retry:
|
error_cls = kafka_errors.get(response.error, UnknownError)
|
||||||
error_type = FailedPayloadsError
|
_handle_error(error_cls, [reqs.keys()[i]], reqs_to_retry)
|
||||||
|
|
||||||
except RequestTimedOutError:
|
|
||||||
error_type = RequestTimedOutError
|
|
||||||
if retry_options.retry_on_timeouts:
|
|
||||||
reqs_to_retry = reqs.keys()
|
|
||||||
|
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
error_type = type(ex)
|
error_cls = kafka_errors.get(type(ex), UnknownError)
|
||||||
if type(ex) in RETRY_ERROR_TYPES:
|
_handle_error(error_cls, reqs.keys(), reqs_to_retry)
|
||||||
reqs_to_retry = reqs.keys()
|
|
||||||
|
|
||||||
if not reqs_to_retry:
|
if not reqs_to_retry:
|
||||||
reqs = {}
|
reqs = {}
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# doing backoff before next retry
|
# doing backoff before next retry
|
||||||
if error_type in RETRY_BACKOFF_ERROR_TYPES and retry_options.backoff_ms:
|
if do_backoff and retry_options.backoff_ms:
|
||||||
log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms)
|
log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms)
|
||||||
time.sleep(float(retry_options.backoff_ms) / 1000)
|
time.sleep(float(retry_options.backoff_ms) / 1000)
|
||||||
|
|
||||||
# refresh topic metadata before next retry
|
# refresh topic metadata before next retry
|
||||||
if error_type in RETRY_REFRESH_ERROR_TYPES:
|
if do_refresh:
|
||||||
client.load_metadata_for_topics()
|
client.load_metadata_for_topics()
|
||||||
|
|
||||||
reqs = dict((key, count + 1) for (key, count) in reqs.items()
|
reqs = dict((key, count + 1) for (key, count) in reqs.items()
|
||||||
|
@@ -143,7 +143,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
|
|||||||
def send_side_effect(reqs, *args, **kwargs):
|
def send_side_effect(reqs, *args, **kwargs):
|
||||||
if self.client.is_first_time:
|
if self.client.is_first_time:
|
||||||
self.client.is_first_time = False
|
self.client.is_first_time = False
|
||||||
return [[FailedPayloadsError(reqs)]]
|
return [FailedPayloadsError(reqs)]
|
||||||
return []
|
return []
|
||||||
|
|
||||||
self.client.send_produce_request.side_effect = send_side_effect
|
self.client.send_produce_request.side_effect = send_side_effect
|
||||||
@@ -165,7 +165,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
|
|||||||
self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i))
|
self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i))
|
||||||
|
|
||||||
def send_side_effect(reqs, *args, **kwargs):
|
def send_side_effect(reqs, *args, **kwargs):
|
||||||
return [[FailedPayloadsError(reqs)]]
|
return [FailedPayloadsError(reqs)]
|
||||||
|
|
||||||
self.client.send_produce_request.side_effect = send_side_effect
|
self.client.send_produce_request.side_effect = send_side_effect
|
||||||
|
|
||||||
|
1
tox.ini
1
tox.ini
@@ -14,6 +14,7 @@ commands =
|
|||||||
nosetests {posargs:-v --with-id --id-file={envdir}/.noseids --with-timer --timer-top-n 10 --with-coverage --cover-erase --cover-package kafka}
|
nosetests {posargs:-v --with-id --id-file={envdir}/.noseids --with-timer --timer-top-n 10 --with-coverage --cover-erase --cover-package kafka}
|
||||||
setenv =
|
setenv =
|
||||||
PROJECT_ROOT = {toxinidir}
|
PROJECT_ROOT = {toxinidir}
|
||||||
|
passenv = KAFKA_VERSION
|
||||||
|
|
||||||
[testenv:py33]
|
[testenv:py33]
|
||||||
deps =
|
deps =
|
||||||
|
Reference in New Issue
Block a user