Merge pull request #418 from scrapinghub/mp_partition_info
allow to retrieve partition info in mp consumer
This commit is contained in:
@@ -83,6 +83,14 @@ class Consumer(object):
|
||||
self._cleanup_func = cleanup
|
||||
atexit.register(cleanup, self)
|
||||
|
||||
self.partition_info = False # Do not return partition info in msgs
|
||||
|
||||
def provide_partition_info(self):
|
||||
"""
|
||||
Indicates that partition info must be returned by the consumer
|
||||
"""
|
||||
self.partition_info = True
|
||||
|
||||
def fetch_last_known_offsets(self, partitions=None):
|
||||
if self.group is None:
|
||||
raise ValueError('KafkaClient.group must not be None')
|
||||
|
||||
@@ -257,7 +257,8 @@ class MultiProcessConsumer(Consumer):
|
||||
except Empty:
|
||||
break
|
||||
|
||||
messages.append(message)
|
||||
_msg = (partition, message) if self.partition_info else message
|
||||
messages.append(_msg)
|
||||
new_offsets[partition] = message.offset + 1
|
||||
count -= 1
|
||||
if timeout is not None:
|
||||
|
||||
@@ -131,7 +131,6 @@ class SimpleConsumer(Consumer):
|
||||
(buffer_size, max_buffer_size))
|
||||
self.buffer_size = buffer_size
|
||||
self.max_buffer_size = max_buffer_size
|
||||
self.partition_info = False # Do not return partition info in msgs
|
||||
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
|
||||
self.fetch_min_bytes = fetch_size_bytes
|
||||
self.fetch_offsets = self.offsets.copy()
|
||||
@@ -182,12 +181,6 @@ class SimpleConsumer(Consumer):
|
||||
self.fetch_offsets[partition] = resp.offsets[0]
|
||||
return resp.offsets[0]
|
||||
|
||||
def provide_partition_info(self):
|
||||
"""
|
||||
Indicates that partition info must be returned by the consumer
|
||||
"""
|
||||
self.partition_info = True
|
||||
|
||||
def seek(self, offset, whence=None, partition=None):
|
||||
"""
|
||||
Alter the current offset in the consumer, similar to fseek
|
||||
|
||||
Reference in New Issue
Block a user