Optimize auto-commit thread
The previous commit optimized the commit thread such that the timer started only when there were messages to be consumed. This commit goes a step further and ensures the following: * Only one timer thread is created * The main app does not block on exit (waiting for timer thread to finish) This is ensured by having a single thread blocking on an event and keeps calling a function. We use events instead of time.sleep() so as to prevent the python interpreter from running every 50ms checking if the timer has expired (logic copied from threading.Timer)
This commit is contained in:
@@ -58,7 +58,8 @@ class SimpleConsumer(object):
|
|||||||
# Set up the auto-commit timer
|
# Set up the auto-commit timer
|
||||||
if auto_commit is True and auto_commit_every_t is not None:
|
if auto_commit is True and auto_commit_every_t is not None:
|
||||||
self.commit_timer = ReentrantTimer(auto_commit_every_t,
|
self.commit_timer = ReentrantTimer(auto_commit_every_t,
|
||||||
self._timed_commit)
|
self.commit)
|
||||||
|
self.commit_timer.start()
|
||||||
|
|
||||||
def get_or_init_offset_callback(resp):
|
def get_or_init_offset_callback(resp):
|
||||||
if resp.error == ErrorMapping.NO_ERROR:
|
if resp.error == ErrorMapping.NO_ERROR:
|
||||||
@@ -149,15 +150,6 @@ class SimpleConsumer(object):
|
|||||||
|
|
||||||
return total
|
return total
|
||||||
|
|
||||||
def _timed_commit(self):
|
|
||||||
"""
|
|
||||||
Commit offsets as part of timer
|
|
||||||
"""
|
|
||||||
self.commit()
|
|
||||||
|
|
||||||
# Once the commit is done, start the timer again
|
|
||||||
# self.commit_timer.start()
|
|
||||||
|
|
||||||
def commit(self, partitions=[]):
|
def commit(self, partitions=[]):
|
||||||
"""
|
"""
|
||||||
Commit offsets for this consumer
|
Commit offsets for this consumer
|
||||||
@@ -166,11 +158,17 @@ class SimpleConsumer(object):
|
|||||||
all of them
|
all of them
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# short circuit if nothing happened
|
# short circuit if nothing happened. This check is kept outside
|
||||||
|
# to prevent un-necessarily acquiring a lock for checking the state
|
||||||
if self.count_since_commit == 0:
|
if self.count_since_commit == 0:
|
||||||
return
|
return
|
||||||
|
|
||||||
with self.commit_lock:
|
with self.commit_lock:
|
||||||
|
# Do this check again, just in case the state has changed
|
||||||
|
# during the lock acquiring timeout
|
||||||
|
if self.count_since_commit == 0:
|
||||||
|
return
|
||||||
|
|
||||||
reqs = []
|
reqs = []
|
||||||
if len(partitions) == 0: # commit all partitions
|
if len(partitions) == 0: # commit all partitions
|
||||||
partitions = self.offsets.keys()
|
partitions = self.offsets.keys()
|
||||||
@@ -200,12 +198,7 @@ class SimpleConsumer(object):
|
|||||||
return
|
return
|
||||||
|
|
||||||
if self.count_since_commit > self.auto_commit_every_n:
|
if self.count_since_commit > self.auto_commit_every_n:
|
||||||
if self.commit_timer is not None:
|
self.commit()
|
||||||
self.commit_timer.stop()
|
|
||||||
self.commit()
|
|
||||||
self.commit_timer.start()
|
|
||||||
else:
|
|
||||||
self.commit()
|
|
||||||
|
|
||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
"""
|
"""
|
||||||
@@ -223,10 +216,6 @@ class SimpleConsumer(object):
|
|||||||
if len(iters) == 0:
|
if len(iters) == 0:
|
||||||
break
|
break
|
||||||
|
|
||||||
# Now that we are consuming data, start the commit thread
|
|
||||||
if self.commit_timer and not self.commit_timer.is_active:
|
|
||||||
self.commit_timer.start()
|
|
||||||
|
|
||||||
for partition, it in iters.items():
|
for partition, it in iters.items():
|
||||||
try:
|
try:
|
||||||
yield it.next()
|
yield it.next()
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from itertools import groupby
|
from itertools import groupby
|
||||||
import struct
|
import struct
|
||||||
from threading import Timer
|
from threading import Thread, Event
|
||||||
|
|
||||||
|
|
||||||
def write_int_string(s):
|
def write_int_string(s):
|
||||||
@@ -81,22 +81,41 @@ class ReentrantTimer(object):
|
|||||||
|
|
||||||
t: timer interval in milliseconds
|
t: timer interval in milliseconds
|
||||||
fn: a callable to invoke
|
fn: a callable to invoke
|
||||||
|
args: tuple of args to be passed to function
|
||||||
|
kwargs: keyword arguments to be passed to function
|
||||||
"""
|
"""
|
||||||
def __init__(self, t, fn):
|
def __init__(self, t, fn, *args, **kwargs):
|
||||||
self.timer = None
|
|
||||||
self.t = t
|
if t <= 0:
|
||||||
|
raise ValueError('Invalid timeout value')
|
||||||
|
|
||||||
|
if not callable(fn):
|
||||||
|
raise ValueError('fn must be callable')
|
||||||
|
|
||||||
|
self.thread = None
|
||||||
|
self.t = t / 1000.0
|
||||||
self.fn = fn
|
self.fn = fn
|
||||||
self.is_active = False
|
self.args = args
|
||||||
|
self.kwargs = kwargs
|
||||||
|
self.active = None
|
||||||
|
|
||||||
|
def _timer(self, active):
|
||||||
|
while not active.wait(self.t):
|
||||||
|
self.fn(*self.args, **self.kwargs)
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
if self.timer is not None:
|
if self.thread is not None:
|
||||||
self.timer.cancel()
|
self.stop()
|
||||||
|
|
||||||
self.timer = Timer(self.t / 1000., self.fn)
|
self.active = Event()
|
||||||
self.is_active = True
|
self.thread = Thread(target=self._timer, args=(self.active))
|
||||||
self.timer.start()
|
self.thread.daemon = True # So the app exits when main thread exits
|
||||||
|
self.thread.start()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self.timer.cancel()
|
if self.thread is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
self.active.set()
|
||||||
|
self.thread.join(self.t + 1)
|
||||||
self.timer = None
|
self.timer = None
|
||||||
self.is_active = False
|
|
||||||
|
|||||||
Reference in New Issue
Block a user