Add iter_timeout option to SimpleConsumer. If not None, it causes the iterator to exit when reached.
Also put constant timeout values in pre-defined constants
This commit is contained in:
@@ -24,6 +24,9 @@ FETCH_MAX_WAIT_TIME = 100
|
||||
FETCH_MIN_BYTES = 4096
|
||||
FETCH_BUFFER_SIZE_BYTES = 4096
|
||||
|
||||
ITER_TIMEOUT_SECONDS = 60
|
||||
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
|
||||
|
||||
|
||||
class FetchContext(object):
|
||||
"""
|
||||
@@ -210,6 +213,9 @@ class SimpleConsumer(Consumer):
|
||||
fetch_size_bytes: number of bytes to request in a FetchRequest
|
||||
buffer_size: initial number of bytes to tell kafka we have
|
||||
available. This will double every time it's not enough
|
||||
iter_timeout: default None. How much time (in seconds) to wait for a
|
||||
message in the iterator before exiting. None means no
|
||||
timeout, so it will wait forever.
|
||||
|
||||
Auto commit details:
|
||||
If both auto_commit_every_n and auto_commit_every_t are set, they will
|
||||
@@ -221,13 +227,15 @@ class SimpleConsumer(Consumer):
|
||||
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
|
||||
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
|
||||
fetch_size_bytes=FETCH_MIN_BYTES,
|
||||
buffer_size=FETCH_BUFFER_SIZE_BYTES):
|
||||
buffer_size=FETCH_BUFFER_SIZE_BYTES,
|
||||
iter_timeout=None):
|
||||
|
||||
self.buffer_size = buffer_size
|
||||
self.partition_info = False # Do not return partition info in msgs
|
||||
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
|
||||
self.fetch_min_bytes = fetch_size_bytes
|
||||
self.fetch_started = defaultdict(bool) # defaults to false
|
||||
self.iter_timeout = iter_timeout
|
||||
self.queue = Queue()
|
||||
|
||||
super(SimpleConsumer, self).__init__(
|
||||
@@ -325,14 +333,22 @@ class SimpleConsumer(Consumer):
|
||||
return None
|
||||
|
||||
def __iter__(self):
|
||||
if self.iter_timeout is None:
|
||||
timeout = ITER_TIMEOUT_SECONDS
|
||||
else:
|
||||
timeout = self.iter_timeout
|
||||
|
||||
while True:
|
||||
message = self.get_message(True, 100)
|
||||
message = self.get_message(True, timeout)
|
||||
if message:
|
||||
yield message
|
||||
elif self.iter_timeout is None:
|
||||
# We did not receive any message yet but we don't have a
|
||||
# timeout, so give up the CPU for a while before trying again
|
||||
time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
|
||||
else:
|
||||
# In case we did not receive any message, give up the CPU for
|
||||
# a while before we try again
|
||||
time.sleep(0.1)
|
||||
# Timed out waiting for a message
|
||||
break
|
||||
|
||||
def _fetch(self):
|
||||
requests = []
|
||||
@@ -417,7 +433,7 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
|
||||
else:
|
||||
# In case we did not receive any message, give up the CPU for
|
||||
# a while before we try again
|
||||
time.sleep(0.1)
|
||||
time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
|
||||
|
||||
consumer.stop()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user