Improve async producer code: logic and style fixes
- send_producer_request with fail_on_error=False to retry failed reqs only - using an internal dict with with namedtuple keys for retry counters - refresh metadata on refresh_error irrespective to retries options - removed infinite retries (retry_options.limit=None) as an over-feature - separate producer init args for retries options (limit,backoff,on_timeouts) - AsyncProducerQueueFull returns a list of failed messages - producer tests improved thanks to @rogaha and @toli
This commit is contained in:
@@ -14,15 +14,8 @@ MetadataResponse = namedtuple("MetadataResponse",
|
||||
["brokers", "topics"])
|
||||
|
||||
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI
|
||||
_ProduceRequest = namedtuple("ProduceRequest",
|
||||
["topic", "partition", "messages", "retries"])
|
||||
|
||||
|
||||
class ProduceRequest(_ProduceRequest):
|
||||
def __new__(cls, topic, partition, messages, retries=0):
|
||||
return super(ProduceRequest, cls).__new__(
|
||||
cls, topic, partition, messages, retries)
|
||||
|
||||
ProduceRequest = namedtuple("ProduceRequest",
|
||||
["topic", "partition", "messages"])
|
||||
|
||||
ProduceResponse = namedtuple("ProduceResponse",
|
||||
["topic", "partition", "error", "offset"])
|
||||
@@ -79,7 +72,7 @@ KafkaMessage = namedtuple("KafkaMessage",
|
||||
["topic", "partition", "offset", "key", "value"])
|
||||
|
||||
# Define retry policy for async producer
|
||||
# Limit corner values: None - infinite retries, 0 - no retries
|
||||
# Limit value: int >= 0, 0 means no retries
|
||||
RetryOptions = namedtuple("RetryOptions",
|
||||
["limit", "backoff_ms", "retry_on_timeouts"])
|
||||
|
||||
@@ -218,7 +211,9 @@ class KafkaConfigurationError(KafkaError):
|
||||
|
||||
|
||||
class AsyncProducerQueueFull(KafkaError):
|
||||
pass
|
||||
def __init__(self, failed_msgs, *args):
|
||||
super(AsyncProducerQueueFull, self).__init__(*args)
|
||||
self.failed_msgs = failed_msgs
|
||||
|
||||
|
||||
def _iter_broker_errors():
|
||||
|
@@ -34,8 +34,10 @@ BATCH_SEND_MSG_COUNT = 20
|
||||
ASYNC_QUEUE_MAXSIZE = 0
|
||||
ASYNC_QUEUE_PUT_TIMEOUT = 0
|
||||
# no retries by default
|
||||
ASYNC_RETRY_OPTIONS = RetryOptions(
|
||||
limit=0, backoff_ms=0, retry_on_timeouts=False)
|
||||
ASYNC_RETRY_LIMIT = 0
|
||||
ASYNC_RETRY_BACKOFF_MS = 0
|
||||
ASYNC_RETRY_ON_TIMEOUTS = False
|
||||
|
||||
STOP_ASYNC_PRODUCER = -1
|
||||
|
||||
|
||||
@@ -46,7 +48,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
|
||||
a specified timeout and send them upstream to the brokers in one
|
||||
request
|
||||
"""
|
||||
reqs = []
|
||||
reqs = {}
|
||||
client.reinit()
|
||||
|
||||
while not stop_event.is_set():
|
||||
@@ -81,36 +83,38 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
|
||||
messages = create_message_set(msg, codec, key)
|
||||
req = ProduceRequest(topic_partition.topic,
|
||||
topic_partition.partition,
|
||||
messages)
|
||||
reqs.append(req)
|
||||
tuple(messages))
|
||||
reqs[req] = 0
|
||||
|
||||
if not reqs:
|
||||
continue
|
||||
|
||||
reqs_to_retry, error_type = [], None
|
||||
try:
|
||||
client.send_produce_request(reqs,
|
||||
acks=req_acks,
|
||||
timeout=ack_timeout)
|
||||
|
||||
except FailedPayloadsError as ex:
|
||||
error_type = FailedPayloadsError
|
||||
reqs_to_retry = ex.failed_payloads
|
||||
try:
|
||||
reply = client.send_produce_request(reqs.keys(),
|
||||
acks=req_acks,
|
||||
timeout=ack_timeout,
|
||||
fail_on_error=False)
|
||||
reqs_to_retry = [req for broker_responses in reply
|
||||
for response in broker_responses
|
||||
for req in response.failed_payloads
|
||||
if isinstance(response, FailedPayloadsError)]
|
||||
if reqs_to_retry:
|
||||
error_type = FailedPayloadsError
|
||||
|
||||
except RequestTimedOutError:
|
||||
error_type = RequestTimedOutError
|
||||
if retry_options.retry_on_timeouts:
|
||||
reqs_to_retry = reqs
|
||||
reqs_to_retry = reqs.keys()
|
||||
|
||||
except Exception as ex:
|
||||
error_type = type(ex)
|
||||
if type(ex) in RETRY_ERROR_TYPES:
|
||||
reqs_to_retry = reqs
|
||||
reqs_to_retry = reqs.keys()
|
||||
|
||||
finally:
|
||||
reqs = []
|
||||
|
||||
if not reqs_to_retry or retry_options.limit == 0:
|
||||
if not reqs_to_retry:
|
||||
reqs = {}
|
||||
continue
|
||||
|
||||
# doing backoff before next retry
|
||||
@@ -122,10 +126,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
|
||||
if error_type in RETRY_REFRESH_ERROR_TYPES:
|
||||
client.load_metadata_for_topics()
|
||||
|
||||
reqs = [req._replace(retries=req.retries+1)
|
||||
for req in reqs_to_retry
|
||||
if not retry_options.limit or
|
||||
(retry_options.limit and req.retries < retry_options.limit)]
|
||||
reqs = {key: count + 1 for key, count in reqs.items()
|
||||
if key in reqs_to_retry and count < retry_options.limit}
|
||||
|
||||
|
||||
class Producer(object):
|
||||
@@ -161,7 +163,9 @@ class Producer(object):
|
||||
batch_send=False,
|
||||
batch_send_every_n=BATCH_SEND_MSG_COUNT,
|
||||
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
|
||||
async_retry_options=ASYNC_RETRY_OPTIONS,
|
||||
async_retry_limit=ASYNC_RETRY_LIMIT,
|
||||
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
|
||||
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
|
||||
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
|
||||
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
|
||||
|
||||
@@ -191,6 +195,10 @@ class Producer(object):
|
||||
# Messages are sent through this queue
|
||||
self.queue = Queue(async_queue_maxsize)
|
||||
self.async_queue_put_timeout = async_queue_put_timeout
|
||||
async_retry_options = RetryOptions(
|
||||
limit=async_retry_limit,
|
||||
backoff_ms=async_retry_backoff_ms,
|
||||
retry_on_timeouts=async_retry_on_timeouts)
|
||||
self.thread_stop_event = Event()
|
||||
self.thread = Thread(target=_send_upstream,
|
||||
args=(self.queue,
|
||||
@@ -252,7 +260,7 @@ class Producer(object):
|
||||
raise TypeError("the key must be type bytes")
|
||||
|
||||
if self.async:
|
||||
for m in msg:
|
||||
for idx, m in enumerate(msg):
|
||||
try:
|
||||
item = (TopicAndPartition(topic, partition), m, key)
|
||||
if self.async_queue_put_timeout == 0:
|
||||
@@ -261,6 +269,7 @@ class Producer(object):
|
||||
self.queue.put(item, True, self.async_queue_put_timeout)
|
||||
except Full:
|
||||
raise AsyncProducerQueueFull(
|
||||
msg[idx:],
|
||||
'Producer async queue overfilled. '
|
||||
'Current queue size %d.' % self.queue.qsize())
|
||||
resp = []
|
||||
|
@@ -7,8 +7,8 @@ from kafka.util import kafka_bytestring
|
||||
|
||||
from .base import (
|
||||
Producer, BATCH_SEND_DEFAULT_INTERVAL,
|
||||
BATCH_SEND_MSG_COUNT, ASYNC_RETRY_OPTIONS,
|
||||
ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT
|
||||
BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
|
||||
ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
|
||||
)
|
||||
|
||||
log = logging.getLogger("kafka")
|
||||
@@ -39,7 +39,9 @@ class KeyedProducer(Producer):
|
||||
batch_send=False,
|
||||
batch_send_every_n=BATCH_SEND_MSG_COUNT,
|
||||
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
|
||||
async_retry_options=ASYNC_RETRY_OPTIONS,
|
||||
async_retry_limit=ASYNC_RETRY_LIMIT,
|
||||
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
|
||||
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
|
||||
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
|
||||
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
|
||||
if not partitioner:
|
||||
@@ -51,7 +53,9 @@ class KeyedProducer(Producer):
|
||||
ack_timeout, codec, batch_send,
|
||||
batch_send_every_n,
|
||||
batch_send_every_t,
|
||||
async_retry_options,
|
||||
async_retry_limit,
|
||||
async_retry_backoff_ms,
|
||||
async_retry_on_timeouts,
|
||||
async_queue_maxsize,
|
||||
async_queue_put_timeout)
|
||||
|
||||
|
@@ -10,8 +10,8 @@ from six.moves import xrange
|
||||
|
||||
from .base import (
|
||||
Producer, BATCH_SEND_DEFAULT_INTERVAL,
|
||||
BATCH_SEND_MSG_COUNT, ASYNC_RETRY_OPTIONS,
|
||||
ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT
|
||||
BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
|
||||
ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
|
||||
)
|
||||
|
||||
log = logging.getLogger("kafka")
|
||||
@@ -47,7 +47,9 @@ class SimpleProducer(Producer):
|
||||
batch_send_every_n=BATCH_SEND_MSG_COUNT,
|
||||
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
|
||||
random_start=True,
|
||||
async_retry_options=ASYNC_RETRY_OPTIONS,
|
||||
async_retry_limit=ASYNC_RETRY_LIMIT,
|
||||
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
|
||||
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
|
||||
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
|
||||
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
|
||||
self.partition_cycles = {}
|
||||
@@ -56,7 +58,9 @@ class SimpleProducer(Producer):
|
||||
ack_timeout, codec, batch_send,
|
||||
batch_send_every_n,
|
||||
batch_send_every_t,
|
||||
async_retry_options,
|
||||
async_retry_limit,
|
||||
async_retry_backoff_ms,
|
||||
async_retry_on_timeouts,
|
||||
async_queue_maxsize,
|
||||
async_queue_put_timeout)
|
||||
|
||||
|
@@ -17,6 +17,10 @@ try:
|
||||
from queue import Empty, Queue
|
||||
except ImportError:
|
||||
from Queue import Empty, Queue
|
||||
try:
|
||||
xrange
|
||||
except NameError:
|
||||
xrange = range
|
||||
|
||||
|
||||
class TestKafkaProducer(unittest.TestCase):
|
||||
@@ -52,7 +56,8 @@ class TestKafkaProducer(unittest.TestCase):
|
||||
producer.send_messages(topic, b'hi')
|
||||
assert client.send_produce_request.called
|
||||
|
||||
def test_producer_async_queue_overfilled_batch_send(self):
|
||||
@patch('kafka.producer.base._send_upstream')
|
||||
def test_producer_async_queue_overfilled_batch_send(self, mock):
|
||||
queue_size = 2
|
||||
producer = Producer(MagicMock(), batch_send=True,
|
||||
async_queue_maxsize=queue_size)
|
||||
@@ -64,8 +69,12 @@ class TestKafkaProducer(unittest.TestCase):
|
||||
with self.assertRaises(AsyncProducerQueueFull):
|
||||
message_list = [message] * (queue_size + 1)
|
||||
producer.send_messages(topic, partition, *message_list)
|
||||
self.assertEqual(producer.queue.qsize(), queue_size)
|
||||
for _ in xrange(producer.queue.qsize()):
|
||||
producer.queue.get()
|
||||
|
||||
def test_producer_async_queue_overfilled(self):
|
||||
@patch('kafka.producer.base._send_upstream')
|
||||
def test_producer_async_queue_overfilled(self, mock):
|
||||
queue_size = 2
|
||||
producer = Producer(MagicMock(), async=True,
|
||||
async_queue_maxsize=queue_size)
|
||||
@@ -77,7 +86,9 @@ class TestKafkaProducer(unittest.TestCase):
|
||||
with self.assertRaises(AsyncProducerQueueFull):
|
||||
message_list = [message] * (queue_size + 1)
|
||||
producer.send_messages(topic, partition, *message_list)
|
||||
|
||||
self.assertEqual(producer.queue.qsize(), queue_size)
|
||||
for _ in xrange(producer.queue.qsize()):
|
||||
producer.queue.get()
|
||||
|
||||
|
||||
class TestKafkaProducerSendUpstream(unittest.TestCase):
|
||||
@@ -121,7 +132,6 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
|
||||
# 3 batches of 3 msgs each + 1 batch of 1 message
|
||||
self.assertEqual(self.client.send_produce_request.call_count, 4)
|
||||
|
||||
|
||||
def test_first_send_failed(self):
|
||||
|
||||
# lets create a queue and add 10 messages for 10 different partitions
|
||||
@@ -133,7 +143,8 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
|
||||
def send_side_effect(reqs, *args, **kwargs):
|
||||
if self.client.is_first_time:
|
||||
self.client.is_first_time = False
|
||||
raise FailedPayloadsError(reqs)
|
||||
return [[FailedPayloadsError(reqs)]]
|
||||
return []
|
||||
|
||||
self.client.send_produce_request.side_effect = send_side_effect
|
||||
|
||||
@@ -154,7 +165,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
|
||||
self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i))
|
||||
|
||||
def send_side_effect(reqs, *args, **kwargs):
|
||||
raise FailedPayloadsError(reqs)
|
||||
return [[FailedPayloadsError(reqs)]]
|
||||
|
||||
self.client.send_produce_request.side_effect = send_side_effect
|
||||
|
||||
@@ -168,30 +179,6 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
|
||||
# 3 retries of the batches above = 4 + 3 * 4 = 16, all failed
|
||||
self.assertEqual(self.client.send_produce_request.call_count, 16)
|
||||
|
||||
def test_with_unlimited_retries(self):
|
||||
|
||||
# lets create a queue and add 10 messages for 10 different partitions
|
||||
# to show how retries should work ideally
|
||||
for i in range(10):
|
||||
self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
|
||||
|
||||
def send_side_effect(reqs, *args, **kwargs):
|
||||
raise FailedPayloadsError(reqs)
|
||||
|
||||
self.client.send_produce_request.side_effect = send_side_effect
|
||||
|
||||
self._run_process(None)
|
||||
|
||||
# the queue should have 7 elements
|
||||
# 3 batches of 1 msg each were retried all this time
|
||||
self.assertEqual(self.queue.empty(), False)
|
||||
try:
|
||||
for i in range(7):
|
||||
self.queue.get(timeout=0.01)
|
||||
except Empty:
|
||||
self.fail("Should be 7 elems in the queue")
|
||||
self.assertEqual(self.queue.empty(), True)
|
||||
|
||||
# 1s / 50ms of backoff = 20 times max
|
||||
calls = self.client.send_produce_request.call_count
|
||||
self.assertTrue(calls > 10 & calls <= 20)
|
||||
def tearDown(self):
|
||||
for _ in xrange(self.queue.qsize()):
|
||||
self.queue.get()
|
||||
|
Reference in New Issue
Block a user