Bug fix: KafkaConsumer.position()

Method KafkaConsumer.position() was not refreshing position correctly
when calling self._update_fetch_positions().
This commit is contained in:
Stefán Þorvarðarson
2016-03-09 11:50:29 +00:00
parent d5f31e6e3c
commit b787a2211e

View File

@@ -477,7 +477,7 @@ class KafkaConsumer(six.Iterator):
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
offset = self._subscription.assignment[partition].position
if offset is None:
self._update_fetch_positions(partition)
self._update_fetch_positions([partition])
offset = self._subscription.assignment[partition].position
return offset