Cherry-pick mrtheb/kafka-python 2b016b69
Set FetchRequest MaxBytes value to bufsize instead of fetchsize (=MinBytes)
This commit is contained in:
@@ -359,7 +359,10 @@ class SimpleConsumer(Consumer):
|
|||||||
fetch_size = self.fetch_min_bytes
|
fetch_size = self.fetch_min_bytes
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
req = FetchRequest(self.topic, partition, offset, fetch_size)
|
# use MaxBytes = client's bufsize since we're only
|
||||||
|
# fetching one topic + partition
|
||||||
|
req = FetchRequest(
|
||||||
|
self.topic, partition, offset, self.client.bufsize)
|
||||||
|
|
||||||
(resp,) = self.client.send_fetch_request([req],
|
(resp,) = self.client.send_fetch_request([req],
|
||||||
max_wait_time=self.fetch_max_wait_time,
|
max_wait_time=self.fetch_max_wait_time,
|
||||||
|
@@ -38,7 +38,8 @@ def read_short_string(data, cur):
|
|||||||
|
|
||||||
def read_int_string(data, cur):
|
def read_int_string(data, cur):
|
||||||
if len(data) < cur + 4:
|
if len(data) < cur + 4:
|
||||||
raise BufferUnderflowError("Not enough data left")
|
raise BufferUnderflowError(
|
||||||
|
"Not enough data left to read string len (%d < %d)" % (len(data), cur + 4))
|
||||||
|
|
||||||
(strLen,) = struct.unpack('>i', data[cur:cur + 4])
|
(strLen,) = struct.unpack('>i', data[cur:cur + 4])
|
||||||
if strLen == -1:
|
if strLen == -1:
|
||||||
@@ -46,7 +47,8 @@ def read_int_string(data, cur):
|
|||||||
|
|
||||||
cur += 4
|
cur += 4
|
||||||
if len(data) < cur + strLen:
|
if len(data) < cur + strLen:
|
||||||
raise BufferUnderflowError("Not enough data left")
|
raise BufferUnderflowError(
|
||||||
|
"Not enough data left to read string (%d < %d)" % (len(data), cur + strLen))
|
||||||
|
|
||||||
out = data[cur:cur + strLen]
|
out = data[cur:cur + strLen]
|
||||||
return (out, cur + strLen)
|
return (out, cur + strLen)
|
||||||
@@ -68,7 +70,6 @@ def group_by_topic_and_partition(tuples):
|
|||||||
return out
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class ReentrantTimer(object):
|
class ReentrantTimer(object):
|
||||||
"""
|
"""
|
||||||
A timer that can be restarted, unlike threading.Timer
|
A timer that can be restarted, unlike threading.Timer
|
||||||
|
Reference in New Issue
Block a user