Deprecate async producer batch_send kwarg -- use 'async' instead
This commit is contained in:
@@ -47,7 +47,7 @@ SimpleProducer
|
||||
# Notes:
|
||||
# * If the producer dies before the messages are sent, there will be losses
|
||||
# * Call producer.stop() to send the messages and cleanup
|
||||
producer = SimpleProducer(kafka, batch_send=True,
|
||||
producer = SimpleProducer(kafka, async=True,
|
||||
batch_send_every_n=20,
|
||||
batch_send_every_t=60)
|
||||
|
||||
|
@@ -166,32 +166,54 @@ class Producer(object):
|
||||
Base class to be used by producers
|
||||
|
||||
Arguments:
|
||||
client: The Kafka client instance to use
|
||||
async: If set to true, the messages are sent asynchronously via another
|
||||
thread (process). We will not wait for a response to these
|
||||
WARNING!!! current implementation of async producer does not
|
||||
guarantee message delivery. Use at your own risk! Or help us
|
||||
improve with a PR!
|
||||
req_acks: A value indicating the acknowledgements that the server must
|
||||
receive before responding to the request
|
||||
ack_timeout: Value (in milliseconds) indicating a timeout for waiting
|
||||
for an acknowledgement
|
||||
batch_send: If True, messages are send in batches
|
||||
batch_send_every_n: If set, messages are send in batches of this size
|
||||
batch_send_every_t: If set, messages are send after this timeout
|
||||
"""
|
||||
client (KafkaClient): instance to use for broker communications.
|
||||
codec (kafka.protocol.ALL_CODECS): compression codec to use.
|
||||
req_acks (int, optional): A value indicating the acknowledgements that
|
||||
the server must receive before responding to the request,
|
||||
defaults to 1 (local ack).
|
||||
ack_timeout (int, optional): millisecond timeout to wait for the
|
||||
configured req_acks, defaults to 1000.
|
||||
async (bool, optional): send message using a background thread,
|
||||
defaults to False.
|
||||
batch_send_every_n (int, optional): If async is True, messages are
|
||||
sent in batches of this size, defaults to 20.
|
||||
batch_send_every_t (int or float, optional): If async is True,
|
||||
messages are sent immediately after this timeout in seconds, even
|
||||
if there are fewer than batch_send_every_n, defaults to 20.
|
||||
async_retry_limit (int, optional): number of retries for failed messages
|
||||
or None for unlimited, defaults to None / unlimited.
|
||||
async_retry_backoff_ms (int, optional): milliseconds to backoff on
|
||||
failed messages, defaults to 100.
|
||||
async_retry_on_timeouts (bool, optional): whether to retry on
|
||||
RequestTimeoutError, defaults to True.
|
||||
async_queue_maxsize (int, optional): limit to the size of the
|
||||
internal message queue in number of messages (not size), defaults
|
||||
to 0 (no limit).
|
||||
async_queue_put_timeout (int or float, optional): timeout seconds
|
||||
for queue.put in send_messages for async producers -- will only
|
||||
apply if async_queue_maxsize > 0 and the queue is Full,
|
||||
defaults to 0 (fail immediately on full queue).
|
||||
async_log_messages_on_error (bool, optional): set to False and the
|
||||
async producer will only log hash() contents on failed produce
|
||||
requests, defaults to True (log full messages). Hash logging
|
||||
will not allow you to identify the specific message that failed,
|
||||
but it will allow you to match failures with retries.
|
||||
|
||||
Deprecated Arguments:
|
||||
batch_send (bool, optional): If True, messages are sent by a background
|
||||
thread in batches, defaults to False. Deprecated, use 'async'
|
||||
"""
|
||||
ACK_NOT_REQUIRED = 0 # No ack is required
|
||||
ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
|
||||
ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed
|
||||
|
||||
DEFAULT_ACK_TIMEOUT = 1000
|
||||
|
||||
def __init__(self, client, async=False,
|
||||
def __init__(self, client,
|
||||
req_acks=ACK_AFTER_LOCAL_WRITE,
|
||||
ack_timeout=DEFAULT_ACK_TIMEOUT,
|
||||
codec=None,
|
||||
batch_send=False,
|
||||
async=False,
|
||||
batch_send=False, # deprecated, use async
|
||||
batch_send_every_n=BATCH_SEND_MSG_COUNT,
|
||||
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
|
||||
async_retry_limit=ASYNC_RETRY_LIMIT,
|
||||
@@ -201,14 +223,10 @@ class Producer(object):
|
||||
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT,
|
||||
async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR):
|
||||
|
||||
if batch_send:
|
||||
async = True
|
||||
if async:
|
||||
assert batch_send_every_n > 0
|
||||
assert batch_send_every_t > 0
|
||||
assert async_queue_maxsize >= 0
|
||||
else:
|
||||
batch_send_every_n = 1
|
||||
batch_send_every_t = 3600
|
||||
|
||||
self.client = client
|
||||
self.async = async
|
||||
|
@@ -49,8 +49,8 @@ class KeyedProducer(Producer):
|
||||
self.partitioner_class = partitioner
|
||||
self.partitioners = {}
|
||||
|
||||
super(KeyedProducer, self).__init__(client, async, req_acks,
|
||||
ack_timeout, codec, batch_send,
|
||||
super(KeyedProducer, self).__init__(client, req_acks, ack_timeout,
|
||||
codec, async, batch_send,
|
||||
batch_send_every_n,
|
||||
batch_send_every_t,
|
||||
async_retry_limit,
|
||||
|
@@ -54,8 +54,8 @@ class SimpleProducer(Producer):
|
||||
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
|
||||
self.partition_cycles = {}
|
||||
self.random_start = random_start
|
||||
super(SimpleProducer, self).__init__(client, async, req_acks,
|
||||
ack_timeout, codec, batch_send,
|
||||
super(SimpleProducer, self).__init__(client, req_acks, ack_timeout,
|
||||
codec, async, batch_send,
|
||||
batch_send_every_n,
|
||||
batch_send_every_t,
|
||||
async_retry_limit,
|
||||
|
@@ -56,23 +56,6 @@ class TestKafkaProducer(unittest.TestCase):
|
||||
producer.send_messages(topic, b'hi')
|
||||
assert client.send_produce_request.called
|
||||
|
||||
@patch('kafka.producer.base._send_upstream')
|
||||
def test_producer_async_queue_overfilled_batch_send(self, mock):
|
||||
queue_size = 2
|
||||
producer = Producer(MagicMock(), batch_send=True,
|
||||
async_queue_maxsize=queue_size)
|
||||
|
||||
topic = b'test-topic'
|
||||
partition = 0
|
||||
message = b'test-message'
|
||||
|
||||
with self.assertRaises(AsyncProducerQueueFull):
|
||||
message_list = [message] * (queue_size + 1)
|
||||
producer.send_messages(topic, partition, *message_list)
|
||||
self.assertEqual(producer.queue.qsize(), queue_size)
|
||||
for _ in xrange(producer.queue.qsize()):
|
||||
producer.queue.get()
|
||||
|
||||
@patch('kafka.producer.base._send_upstream')
|
||||
def test_producer_async_queue_overfilled(self, mock):
|
||||
queue_size = 2
|
||||
|
@@ -221,7 +221,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
batch_interval = 5
|
||||
producer = SimpleProducer(
|
||||
self.client,
|
||||
batch_send=True,
|
||||
async=True,
|
||||
batch_send_every_n=batch_messages,
|
||||
batch_send_every_t=batch_interval,
|
||||
random_start=False)
|
||||
@@ -287,7 +287,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
batch_interval = 5
|
||||
producer = SimpleProducer(
|
||||
self.client,
|
||||
batch_send=True,
|
||||
async=True,
|
||||
batch_send_every_n=100,
|
||||
batch_send_every_t=batch_interval,
|
||||
random_start=False)
|
||||
|
Reference in New Issue
Block a user