Use list comprehension on _topics in KafkaConsumer.fetch_messages
This commit is contained in:
@@ -312,16 +312,16 @@ class KafkaConsumer(object):
|
||||
max_wait_time = self._config['fetch_wait_max_ms']
|
||||
min_bytes = self._config['fetch_min_bytes']
|
||||
|
||||
# Get current fetch offsets
|
||||
offsets = self._offsets.fetch
|
||||
if not offsets:
|
||||
if not self._topics:
|
||||
raise KafkaConfigurationError('No topics or partitions configured')
|
||||
if not self._topics:
|
||||
raise KafkaConfigurationError('No topics or partitions configured')
|
||||
|
||||
if not self._offsets.fetch:
|
||||
raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages')
|
||||
|
||||
fetches = []
|
||||
for topic_partition, offset in six.iteritems(offsets):
|
||||
fetches.append(FetchRequest(topic_partition[0], topic_partition[1], offset, max_bytes))
|
||||
fetches = [FetchRequest(topic, partition,
|
||||
self._offsets.fetch[(topic, partition)],
|
||||
max_bytes)
|
||||
for (topic, partition) in self._topics]
|
||||
|
||||
# client.send_fetch_request will collect topic/partition requests by leader
|
||||
# and send each group as a single FetchRequest to the correct broker
|
||||
|
||||
Reference in New Issue
Block a user