From a4601d3a1bf6792e0d57e600f48e891ef2be1528 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Tue, 11 Jun 2013 14:44:45 +0530 Subject: [PATCH 1/3] Spawn the commit thread only if necessary If there are no messages being consumed, the timer keeps creating new threads at the specified intervals. This may not be necessary. We can control this behaviour such that the timer thread is started only when a message is consumed --- kafka/consumer.py | 7 +++++-- kafka/util.py | 3 +++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index c9f12e1..fe7881a 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -59,7 +59,6 @@ class SimpleConsumer(object): if auto_commit is True and auto_commit_every_t is not None: self.commit_timer = ReentrantTimer(auto_commit_every_t, self._timed_commit) - self.commit_timer.start() def get_or_init_offset_callback(resp): if resp.error == ErrorMapping.NO_ERROR: @@ -157,7 +156,7 @@ class SimpleConsumer(object): self.commit() # Once the commit is done, start the timer again - self.commit_timer.start() + # self.commit_timer.start() def commit(self, partitions=[]): """ @@ -224,6 +223,10 @@ class SimpleConsumer(object): if len(iters) == 0: break + # Now that we are consuming data, start the commit thread + if self.commit_timer and not self.commit_timer.is_active: + self.commit_timer.start() + for partition, it in iters.items(): try: yield it.next() diff --git a/kafka/util.py b/kafka/util.py index 10bf838..b3112d5 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -86,14 +86,17 @@ class ReentrantTimer(object): self.timer = None self.t = t self.fn = fn + self.is_active = False def start(self): if self.timer is not None: self.timer.cancel() self.timer = Timer(self.t / 1000., self.fn) + self.is_active = True self.timer.start() def stop(self): self.timer.cancel() self.timer = None + self.is_active = False From b0c87eed088936c9ec571f789d7818bf445d47b3 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Wed, 12 Jun 2013 13:42:24 +0530 Subject: [PATCH 2/3] Optimize auto-commit thread The previous commit optimized the commit thread such that the timer started only when there were messages to be consumed. This commit goes a step further and ensures the following: * Only one timer thread is created * The main app does not block on exit (waiting for timer thread to finish) This is ensured by having a single thread blocking on an event and keeps calling a function. We use events instead of time.sleep() so as to prevent the python interpreter from running every 50ms checking if the timer has expired (logic copied from threading.Timer) --- kafka/consumer.py | 31 ++++++++++--------------------- kafka/util.py | 43 +++++++++++++++++++++++++++++++------------ 2 files changed, 41 insertions(+), 33 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index fe7881a..f8855dc 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -58,7 +58,8 @@ class SimpleConsumer(object): # Set up the auto-commit timer if auto_commit is True and auto_commit_every_t is not None: self.commit_timer = ReentrantTimer(auto_commit_every_t, - self._timed_commit) + self.commit) + self.commit_timer.start() def get_or_init_offset_callback(resp): if resp.error == ErrorMapping.NO_ERROR: @@ -149,15 +150,6 @@ class SimpleConsumer(object): return total - def _timed_commit(self): - """ - Commit offsets as part of timer - """ - self.commit() - - # Once the commit is done, start the timer again - # self.commit_timer.start() - def commit(self, partitions=[]): """ Commit offsets for this consumer @@ -166,11 +158,17 @@ class SimpleConsumer(object): all of them """ - # short circuit if nothing happened + # short circuit if nothing happened. This check is kept outside + # to prevent un-necessarily acquiring a lock for checking the state if self.count_since_commit == 0: return with self.commit_lock: + # Do this check again, just in case the state has changed + # during the lock acquiring timeout + if self.count_since_commit == 0: + return + reqs = [] if len(partitions) == 0: # commit all partitions partitions = self.offsets.keys() @@ -200,12 +198,7 @@ class SimpleConsumer(object): return if self.count_since_commit > self.auto_commit_every_n: - if self.commit_timer is not None: - self.commit_timer.stop() - self.commit() - self.commit_timer.start() - else: - self.commit() + self.commit() def __iter__(self): """ @@ -223,10 +216,6 @@ class SimpleConsumer(object): if len(iters) == 0: break - # Now that we are consuming data, start the commit thread - if self.commit_timer and not self.commit_timer.is_active: - self.commit_timer.start() - for partition, it in iters.items(): try: yield it.next() diff --git a/kafka/util.py b/kafka/util.py index b3112d5..11db747 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,7 +1,7 @@ from collections import defaultdict from itertools import groupby import struct -from threading import Timer +from threading import Thread, Event def write_int_string(s): @@ -81,22 +81,41 @@ class ReentrantTimer(object): t: timer interval in milliseconds fn: a callable to invoke + args: tuple of args to be passed to function + kwargs: keyword arguments to be passed to function """ - def __init__(self, t, fn): - self.timer = None - self.t = t + def __init__(self, t, fn, *args, **kwargs): + + if t <= 0: + raise ValueError('Invalid timeout value') + + if not callable(fn): + raise ValueError('fn must be callable') + + self.thread = None + self.t = t / 1000.0 self.fn = fn - self.is_active = False + self.args = args + self.kwargs = kwargs + self.active = None + + def _timer(self, active): + while not active.wait(self.t): + self.fn(*self.args, **self.kwargs) def start(self): - if self.timer is not None: - self.timer.cancel() + if self.thread is not None: + self.stop() - self.timer = Timer(self.t / 1000., self.fn) - self.is_active = True - self.timer.start() + self.active = Event() + self.thread = Thread(target=self._timer, args=(self.active)) + self.thread.daemon = True # So the app exits when main thread exits + self.thread.start() def stop(self): - self.timer.cancel() + if self.thread is None: + return + + self.active.set() + self.thread.join(self.t + 1) self.timer = None - self.is_active = False From 28884c80f3cea53220c77a4b648488442e3900dd Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Wed, 12 Jun 2013 13:58:15 +0530 Subject: [PATCH 3/3] Fix an issue with thread argument --- kafka/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/util.py b/kafka/util.py index 11db747..11178f5 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -108,7 +108,7 @@ class ReentrantTimer(object): self.stop() self.active = Event() - self.thread = Thread(target=self._timer, args=(self.active)) + self.thread = Thread(target=self._timer, args=(self.active,)) self.thread.daemon = True # So the app exits when main thread exits self.thread.start()