From c36cb618d2ba5fcba118dd8b87bf51f26074c8f1 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Wed, 15 Jan 2014 11:13:59 -0800 Subject: [PATCH 1/5] Fix offset increments: * Increment the offset before returning a message rather than when putting it in the internal queue. This prevents committing the wrong offsets. * In MultiProcessConsumer, store the offset of the next message --- kafka/consumer.py | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 522d6ca..657024f 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -338,7 +338,19 @@ class SimpleConsumer(Consumer): with FetchContext(self, block, timeout): self._fetch() try: - return self.queue.get_nowait() + partition, message = self.queue.get_nowait() + + # Update partition offset + self.offsets[partition] = message.offset + 1 + + # Count, check and commit messages if necessary + self.count_since_commit += 1 + self._auto_commit() + + if self.partition_info: + return partition, message + else: + return message except Empty: return None @@ -380,18 +392,8 @@ class SimpleConsumer(Consumer): partition = resp.partition try: for message in resp.messages: - # Update partition offset - self.offsets[partition] = message.offset + 1 - - # Count, check and commit messages if necessary - self.count_since_commit += 1 - self._auto_commit() - # Put the message in our queue - if self.partition_info: - self.queue.put((partition, message)) - else: - self.queue.put(message) + self.queue.put((partition, message)) except ConsumerFetchSizeTooSmall, e: if (self.max_buffer_size is not None and self.buffer_size == self.max_buffer_size): @@ -577,12 +579,11 @@ class MultiProcessConsumer(Consumer): break # Count, check and commit messages if necessary - self.offsets[partition] = message.offset + self.offsets[partition] = message.offset + 1 self.start.clear() - yield message - self.count_since_commit += 1 self._auto_commit() + yield message self.start.clear() @@ -624,7 +625,7 @@ class MultiProcessConsumer(Consumer): messages.append(message) # Count, check and commit messages if necessary - self.offsets[partition] = message.offset + self.offsets[partition] = message.offset + 1 self.count_since_commit += 1 self._auto_commit() count -= 1 From 8cc36dd7a1c7691e5c26b47cb667bc48054594a0 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Wed, 15 Jan 2014 14:28:48 -0800 Subject: [PATCH 2/5] Store fetched offsets separately. Fetch requests can be repeated if we get a ConsumerFetchSizeTooSmall or if _fetch() is called multiple times for some reason. We don't want to re-fetch messages that are already in our queue, so store the offsets of the last enqueued messages from each partition. --- kafka/consumer.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 657024f..b202b23 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -235,6 +235,12 @@ class SimpleConsumer(Consumer): buffer_size=FETCH_BUFFER_SIZE_BYTES, max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, iter_timeout=None): + super(SimpleConsumer, self).__init__( + client, group, topic, + partitions=partitions, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) if max_buffer_size is not None and buffer_size > max_buffer_size: raise ValueError("buffer_size (%d) is greater than " @@ -245,17 +251,10 @@ class SimpleConsumer(Consumer): self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes - self.fetch_started = defaultdict(bool) # defaults to false + self.fetch_offsets = self.offsets.copy() self.iter_timeout = iter_timeout self.queue = Queue() - super(SimpleConsumer, self).__init__( - client, group, topic, - partitions=partitions, - auto_commit=auto_commit, - auto_commit_every_n=auto_commit_every_n, - auto_commit_every_t=auto_commit_every_t) - def provide_partition_info(self): """ Indicates that partition info must be returned by the consumer @@ -301,6 +300,10 @@ class SimpleConsumer(Consumer): else: raise ValueError("Unexpected value for `whence`, %d" % whence) + # Reset queue and fetch offsets since they are invalid + self.fetch_offsets = self.offsets.copy() + self.queue = Queue() + def get_messages(self, count=1, block=True, timeout=0.1): """ Fetch the specified number of messages @@ -375,11 +378,11 @@ class SimpleConsumer(Consumer): def _fetch(self): # Create fetch request payloads for all the partitions requests = [] - partitions = self.offsets.keys() + partitions = self.fetch_offsets.keys() while partitions: for partition in partitions: requests.append(FetchRequest(self.topic, partition, - self.offsets[partition], + self.fetch_offsets[partition], self.buffer_size)) # Send request responses = self.client.send_fetch_request( @@ -394,6 +397,7 @@ class SimpleConsumer(Consumer): for message in resp.messages: # Put the message in our queue self.queue.put((partition, message)) + self.fetch_offsets[partition] = message.offset + 1 except ConsumerFetchSizeTooSmall, e: if (self.max_buffer_size is not None and self.buffer_size == self.max_buffer_size): From 8b3793a649b470879d2684ad0a127c32a1348682 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Wed, 15 Jan 2014 15:54:06 -0800 Subject: [PATCH 3/5] Only use timeout if it's not None --- kafka/consumer.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index b202b23..474e1f5 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -315,7 +315,7 @@ class SimpleConsumer(Consumer): it will block forever. """ messages = [] - if timeout: + if timeout is not None: max_time = time.time() + timeout while count > 0 and (timeout is None or timeout > 0): @@ -328,7 +328,7 @@ class SimpleConsumer(Consumer): if not block: # If we're not blocking, break. break - if timeout: + if timeout is not None: # If we're blocking and have a timeout, reduce it to the # appropriate value timeout = max_time - time.time() @@ -610,7 +610,7 @@ class MultiProcessConsumer(Consumer): self.size.value = count self.pause.clear() - if timeout: + if timeout is not None: max_time = time.time() + timeout while count > 0 and (timeout is None or timeout > 0): @@ -633,7 +633,8 @@ class MultiProcessConsumer(Consumer): self.count_since_commit += 1 self._auto_commit() count -= 1 - timeout = max_time - time.time() + if timeout is not None: + timeout = max_time - time.time() self.size.value = 0 self.start.clear() From e0f726204ab0b8b8ae5c29ae07c1aa369a5a6906 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Wed, 15 Jan 2014 16:22:41 -0800 Subject: [PATCH 4/5] Make get_messages() update and commit offsets just before returning --- kafka/consumer.py | 51 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 474e1f5..12e1af6 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -1,6 +1,5 @@ from __future__ import absolute_import -from collections import defaultdict from itertools import izip_longest, repeat import logging import time @@ -318,10 +317,17 @@ class SimpleConsumer(Consumer): if timeout is not None: max_time = time.time() + timeout + new_offsets = {} while count > 0 and (timeout is None or timeout > 0): - message = self.get_message(block, timeout) - if message: - messages.append(message) + result = self._get_message(block, timeout, get_partition_info=True, + update_offset=False) + if result: + partition, message = result + if self.partition_info: + messages.append(result) + else: + messages.append(message) + new_offsets[partition] = message.offset + 1 count -= 1 else: # Ran out of messages for the last request. @@ -333,9 +339,17 @@ class SimpleConsumer(Consumer): # appropriate value timeout = max_time - time.time() + # Update and commit offsets if necessary + self.offsets.update(new_offsets) + self.count_since_commit += len(messages) + self._auto_commit() return messages - def get_message(self, block=True, timeout=0.1): + def get_message(self, block=True, timeout=0.1, get_partition_info=None): + return self._get_message(block, timeout, get_partition_info) + + def _get_message(self, block=True, timeout=0.1, get_partition_info=None, + update_offset=True): if self.queue.empty(): # We're out of messages, go grab some more. with FetchContext(self, block, timeout): @@ -343,14 +357,17 @@ class SimpleConsumer(Consumer): try: partition, message = self.queue.get_nowait() - # Update partition offset - self.offsets[partition] = message.offset + 1 + if update_offset: + # Update partition offset + self.offsets[partition] = message.offset + 1 - # Count, check and commit messages if necessary - self.count_since_commit += 1 - self._auto_commit() + # Count, check and commit messages if necessary + self.count_since_commit += 1 + self._auto_commit() - if self.partition_info: + if get_partition_info is None: + get_partition_info = self.partition_info + if get_partition_info: return partition, message else: return message @@ -613,6 +630,7 @@ class MultiProcessConsumer(Consumer): if timeout is not None: max_time = time.time() + timeout + new_offsets = {} while count > 0 and (timeout is None or timeout > 0): # Trigger consumption only if the queue is empty # By doing this, we will ensure that consumers do not @@ -627,11 +645,7 @@ class MultiProcessConsumer(Consumer): break messages.append(message) - - # Count, check and commit messages if necessary - self.offsets[partition] = message.offset + 1 - self.count_since_commit += 1 - self._auto_commit() + new_offsets[partition] = message.offset + 1 count -= 1 if timeout is not None: timeout = max_time - time.time() @@ -640,4 +654,9 @@ class MultiProcessConsumer(Consumer): self.start.clear() self.pause.set() + # Update and commit offsets if necessary + self.offsets.update(new_offsets) + self.count_since_commit += len(messages) + self._auto_commit() + return messages From bcd5539cb61f948a8ee51efc965c038d6604be9d Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Mon, 20 Jan 2014 11:41:44 -0800 Subject: [PATCH 5/5] Add doc string for SimpleConsumer._get_message() --- kafka/consumer.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/kafka/consumer.py b/kafka/consumer.py index 12e1af6..8d2c2b0 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -350,6 +350,12 @@ class SimpleConsumer(Consumer): def _get_message(self, block=True, timeout=0.1, get_partition_info=None, update_offset=True): + """ + If no messages can be fetched, returns None. + If get_partition_info is None, it defaults to self.partition_info + If get_partition_info is True, returns (partition, message) + If get_partition_info is False, returns message + """ if self.queue.empty(): # We're out of messages, go grab some more. with FetchContext(self, block, timeout):