
Related to #42 Adds new ConsumerFetchSizeTooSmall exception that is thrown when `_decode_message_set_iter` gets a BufferUnderflowError but has not yet yielded a message In this event, SimpleConsumer will increase the fetch size by 1.5 and continue the fetching loop while _not_ increasing the offset (basically just retries the request with a larger fetch size) Once the consumer fetch size has been increased, it will remain increased while SimpleConsumer fetches from that partition
124 lines
2.9 KiB
Python
124 lines
2.9 KiB
Python
from collections import defaultdict
|
|
from itertools import groupby
|
|
import struct
|
|
from threading import Thread, Event
|
|
|
|
|
|
def write_int_string(s):
|
|
if s is None:
|
|
return struct.pack('>i', -1)
|
|
else:
|
|
return struct.pack('>i%ds' % len(s), len(s), s)
|
|
|
|
|
|
def write_short_string(s):
|
|
if s is None:
|
|
return struct.pack('>h', -1)
|
|
else:
|
|
return struct.pack('>h%ds' % len(s), len(s), s)
|
|
|
|
|
|
def read_short_string(data, cur):
|
|
if len(data) < cur + 2:
|
|
raise BufferUnderflowError("Not enough data left")
|
|
|
|
(strLen,) = struct.unpack('>h', data[cur:cur + 2])
|
|
if strLen == -1:
|
|
return (None, cur + 2)
|
|
|
|
cur += 2
|
|
if len(data) < cur + strLen:
|
|
raise BufferUnderflowError("Not enough data left")
|
|
|
|
out = data[cur:cur + strLen]
|
|
return (out, cur + strLen)
|
|
|
|
|
|
def read_int_string(data, cur):
|
|
if len(data) < cur + 4:
|
|
raise BufferUnderflowError("Not enough data left")
|
|
|
|
(strLen,) = struct.unpack('>i', data[cur:cur + 4])
|
|
if strLen == -1:
|
|
return (None, cur + 4)
|
|
|
|
cur += 4
|
|
if len(data) < cur + strLen:
|
|
raise BufferUnderflowError("Not enough data left")
|
|
|
|
out = data[cur:cur + strLen]
|
|
return (out, cur + strLen)
|
|
|
|
|
|
def relative_unpack(fmt, data, cur):
|
|
size = struct.calcsize(fmt)
|
|
if len(data) < cur + size:
|
|
raise BufferUnderflowError("Not enough data left")
|
|
|
|
out = struct.unpack(fmt, data[cur:cur + size])
|
|
return (out, cur + size)
|
|
|
|
|
|
def group_by_topic_and_partition(tuples):
|
|
out = defaultdict(dict)
|
|
for t in tuples:
|
|
out[t.topic][t.partition] = t
|
|
return out
|
|
|
|
|
|
class BufferUnderflowError(Exception):
|
|
pass
|
|
|
|
|
|
class ChecksumError(Exception):
|
|
pass
|
|
|
|
class ConsumerFetchSizeTooSmall(Exception):
|
|
pass
|
|
|
|
class ReentrantTimer(object):
|
|
"""
|
|
A timer that can be restarted, unlike threading.Timer
|
|
(although this uses threading.Timer)
|
|
|
|
t: timer interval in milliseconds
|
|
fn: a callable to invoke
|
|
args: tuple of args to be passed to function
|
|
kwargs: keyword arguments to be passed to function
|
|
"""
|
|
def __init__(self, t, fn, *args, **kwargs):
|
|
|
|
if t <= 0:
|
|
raise ValueError('Invalid timeout value')
|
|
|
|
if not callable(fn):
|
|
raise ValueError('fn must be callable')
|
|
|
|
self.thread = None
|
|
self.t = t / 1000.0
|
|
self.fn = fn
|
|
self.args = args
|
|
self.kwargs = kwargs
|
|
self.active = None
|
|
|
|
def _timer(self, active):
|
|
while not active.wait(self.t):
|
|
self.fn(*self.args, **self.kwargs)
|
|
|
|
def start(self):
|
|
if self.thread is not None:
|
|
self.stop()
|
|
|
|
self.active = Event()
|
|
self.thread = Thread(target=self._timer, args=(self.active,))
|
|
self.thread.daemon = True # So the app exits when main thread exits
|
|
self.thread.start()
|
|
|
|
def stop(self):
|
|
if self.thread is None:
|
|
return
|
|
|
|
self.active.set()
|
|
self.thread.join(self.t + 1)
|
|
self.timer = None
|