Merge pull request #31 from mahendra/lazythread

Optimize auto-commit process
This commit is contained in:
David Arthur
2013-06-25 05:56:08 -07:00
2 changed files with 40 additions and 26 deletions

View File

@@ -58,7 +58,7 @@ class SimpleConsumer(object):
# Set up the auto-commit timer # Set up the auto-commit timer
if auto_commit is True and auto_commit_every_t is not None: if auto_commit is True and auto_commit_every_t is not None:
self.commit_timer = ReentrantTimer(auto_commit_every_t, self.commit_timer = ReentrantTimer(auto_commit_every_t,
self._timed_commit) self.commit)
self.commit_timer.start() self.commit_timer.start()
def get_or_init_offset_callback(resp): def get_or_init_offset_callback(resp):
@@ -150,15 +150,6 @@ class SimpleConsumer(object):
return total return total
def _timed_commit(self):
"""
Commit offsets as part of timer
"""
self.commit()
# Once the commit is done, start the timer again
self.commit_timer.start()
def commit(self, partitions=[]): def commit(self, partitions=[]):
""" """
Commit offsets for this consumer Commit offsets for this consumer
@@ -167,11 +158,17 @@ class SimpleConsumer(object):
all of them all of them
""" """
# short circuit if nothing happened # short circuit if nothing happened. This check is kept outside
# to prevent un-necessarily acquiring a lock for checking the state
if self.count_since_commit == 0: if self.count_since_commit == 0:
return return
with self.commit_lock: with self.commit_lock:
# Do this check again, just in case the state has changed
# during the lock acquiring timeout
if self.count_since_commit == 0:
return
reqs = [] reqs = []
if len(partitions) == 0: # commit all partitions if len(partitions) == 0: # commit all partitions
partitions = self.offsets.keys() partitions = self.offsets.keys()
@@ -201,12 +198,7 @@ class SimpleConsumer(object):
return return
if self.count_since_commit > self.auto_commit_every_n: if self.count_since_commit > self.auto_commit_every_n:
if self.commit_timer is not None: self.commit()
self.commit_timer.stop()
self.commit()
self.commit_timer.start()
else:
self.commit()
def __iter__(self): def __iter__(self):
""" """

View File

@@ -1,7 +1,7 @@
from collections import defaultdict from collections import defaultdict
from itertools import groupby from itertools import groupby
import struct import struct
from threading import Timer from threading import Thread, Event
def write_int_string(s): def write_int_string(s):
@@ -81,19 +81,41 @@ class ReentrantTimer(object):
t: timer interval in milliseconds t: timer interval in milliseconds
fn: a callable to invoke fn: a callable to invoke
args: tuple of args to be passed to function
kwargs: keyword arguments to be passed to function
""" """
def __init__(self, t, fn): def __init__(self, t, fn, *args, **kwargs):
self.timer = None
self.t = t if t <= 0:
raise ValueError('Invalid timeout value')
if not callable(fn):
raise ValueError('fn must be callable')
self.thread = None
self.t = t / 1000.0
self.fn = fn self.fn = fn
self.args = args
self.kwargs = kwargs
self.active = None
def _timer(self, active):
while not active.wait(self.t):
self.fn(*self.args, **self.kwargs)
def start(self): def start(self):
if self.timer is not None: if self.thread is not None:
self.timer.cancel() self.stop()
self.timer = Timer(self.t / 1000., self.fn) self.active = Event()
self.timer.start() self.thread = Thread(target=self._timer, args=(self.active,))
self.thread.daemon = True # So the app exits when main thread exits
self.thread.start()
def stop(self): def stop(self):
self.timer.cancel() if self.thread is None:
return
self.active.set()
self.thread.join(self.t + 1)
self.timer = None self.timer = None