Add a limit to fetch buffer size, and actually retry requests when fetch size is too small
Note: This can cause fetching a message to exceed a given timeout, but timeouts are not guaranteed anyways, and in this case it's the client's fault for not sending a big enough buffer size rather than the kafka server. This can be bad if max_fetch_size is None (no limit) and there is some message in Kafka that is crazy huge, but that is why we should have some max_fetch_size.
This commit is contained in:
@@ -23,6 +23,7 @@ FETCH_DEFAULT_BLOCK_TIMEOUT = 1
|
|||||||
FETCH_MAX_WAIT_TIME = 100
|
FETCH_MAX_WAIT_TIME = 100
|
||||||
FETCH_MIN_BYTES = 4096
|
FETCH_MIN_BYTES = 4096
|
||||||
FETCH_BUFFER_SIZE_BYTES = 4096
|
FETCH_BUFFER_SIZE_BYTES = 4096
|
||||||
|
MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
|
||||||
|
|
||||||
ITER_TIMEOUT_SECONDS = 60
|
ITER_TIMEOUT_SECONDS = 60
|
||||||
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
|
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
|
||||||
@@ -211,8 +212,10 @@ class SimpleConsumer(Consumer):
|
|||||||
auto_commit_every_t: default 5000. How much time (in milliseconds) to
|
auto_commit_every_t: default 5000. How much time (in milliseconds) to
|
||||||
wait before commit
|
wait before commit
|
||||||
fetch_size_bytes: number of bytes to request in a FetchRequest
|
fetch_size_bytes: number of bytes to request in a FetchRequest
|
||||||
buffer_size: initial number of bytes to tell kafka we have
|
buffer_size: default 4K. Initial number of bytes to tell kafka we
|
||||||
available. This will double every time it's not enough
|
have available. This will double as needed.
|
||||||
|
max_buffer_size: default 16K. Max number of bytes to tell kafka we have
|
||||||
|
available. None means no limit.
|
||||||
iter_timeout: default None. How much time (in seconds) to wait for a
|
iter_timeout: default None. How much time (in seconds) to wait for a
|
||||||
message in the iterator before exiting. None means no
|
message in the iterator before exiting. None means no
|
||||||
timeout, so it will wait forever.
|
timeout, so it will wait forever.
|
||||||
@@ -228,9 +231,15 @@ class SimpleConsumer(Consumer):
|
|||||||
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
|
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
|
||||||
fetch_size_bytes=FETCH_MIN_BYTES,
|
fetch_size_bytes=FETCH_MIN_BYTES,
|
||||||
buffer_size=FETCH_BUFFER_SIZE_BYTES,
|
buffer_size=FETCH_BUFFER_SIZE_BYTES,
|
||||||
|
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
|
||||||
iter_timeout=None):
|
iter_timeout=None):
|
||||||
|
|
||||||
|
if max_buffer_size is not None and buffer_size > max_buffer_size:
|
||||||
|
raise ValueError("buffer_size (%d) is greater than "
|
||||||
|
"max_buffer_size (%d)" %
|
||||||
|
(buffer_size, max_buffer_size))
|
||||||
self.buffer_size = buffer_size
|
self.buffer_size = buffer_size
|
||||||
|
self.max_buffer_size = max_buffer_size
|
||||||
self.partition_info = False # Do not return partition info in msgs
|
self.partition_info = False # Do not return partition info in msgs
|
||||||
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
|
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
|
||||||
self.fetch_min_bytes = fetch_size_bytes
|
self.fetch_min_bytes = fetch_size_bytes
|
||||||
@@ -353,42 +362,54 @@ class SimpleConsumer(Consumer):
|
|||||||
# Create fetch request payloads for all the partitions
|
# Create fetch request payloads for all the partitions
|
||||||
requests = []
|
requests = []
|
||||||
partitions = self.offsets.keys()
|
partitions = self.offsets.keys()
|
||||||
for partition in partitions:
|
while partitions:
|
||||||
requests.append(FetchRequest(self.topic, partition,
|
for partition in partitions:
|
||||||
self.offsets[partition],
|
requests.append(FetchRequest(self.topic, partition,
|
||||||
self.buffer_size))
|
self.offsets[partition],
|
||||||
# Send request
|
self.buffer_size))
|
||||||
responses = self.client.send_fetch_request(
|
# Send request
|
||||||
requests,
|
responses = self.client.send_fetch_request(
|
||||||
max_wait_time=int(self.fetch_max_wait_time),
|
requests,
|
||||||
min_bytes=self.fetch_min_bytes)
|
max_wait_time=int(self.fetch_max_wait_time),
|
||||||
|
min_bytes=self.fetch_min_bytes)
|
||||||
|
|
||||||
for resp in responses:
|
retry_partitions = set()
|
||||||
partition = resp.partition
|
for resp in responses:
|
||||||
try:
|
partition = resp.partition
|
||||||
for message in resp.messages:
|
try:
|
||||||
# Update partition offset
|
for message in resp.messages:
|
||||||
self.offsets[partition] = message.offset + 1
|
# Update partition offset
|
||||||
|
self.offsets[partition] = message.offset + 1
|
||||||
|
|
||||||
# Count, check and commit messages if necessary
|
# Count, check and commit messages if necessary
|
||||||
self.count_since_commit += 1
|
self.count_since_commit += 1
|
||||||
self._auto_commit()
|
self._auto_commit()
|
||||||
|
|
||||||
# Put the message in our queue
|
# Put the message in our queue
|
||||||
if self.partition_info:
|
if self.partition_info:
|
||||||
self.queue.put((partition, message))
|
self.queue.put((partition, message))
|
||||||
|
else:
|
||||||
|
self.queue.put(message)
|
||||||
|
except ConsumerFetchSizeTooSmall, e:
|
||||||
|
if (self.max_buffer_size is not None and
|
||||||
|
self.buffer_size == self.max_buffer_size):
|
||||||
|
log.error("Max fetch size %d too small",
|
||||||
|
self.max_buffer_size)
|
||||||
|
raise e
|
||||||
|
if self.max_buffer_size is None:
|
||||||
|
self.buffer_size *= 2
|
||||||
else:
|
else:
|
||||||
self.queue.put(message)
|
self.buffer_size = max([self.buffer_size * 2,
|
||||||
except ConsumerFetchSizeTooSmall, e:
|
self.max_buffer_size])
|
||||||
self.buffer_size *= 2
|
log.warn("Fetch size too small, increase to %d (2x) "
|
||||||
log.warn("Fetch size too small, increase to %d (2x) and retry",
|
"and retry", self.buffer_size)
|
||||||
self.buffer_size)
|
retry_partitions.add(partition)
|
||||||
except ConsumerNoMoreData, e:
|
except ConsumerNoMoreData, e:
|
||||||
log.debug("Iteration was ended by %r", e)
|
log.debug("Iteration was ended by %r", e)
|
||||||
except StopIteration:
|
except StopIteration:
|
||||||
# Stop iterating through this partition
|
# Stop iterating through this partition
|
||||||
log.debug("Done iterating over partition %s" % partition)
|
log.debug("Done iterating over partition %s" % partition)
|
||||||
|
partitions = retry_partitions
|
||||||
|
|
||||||
def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
|
def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
|
||||||
"""
|
"""
|
||||||
|
|||||||
Reference in New Issue
Block a user