Add more assertions in KafkaConsumer (primarily to seek* methods)
This commit is contained in:
@@ -461,9 +461,11 @@ class KafkaConsumer(six.Iterator):
|
|||||||
|
|
||||||
Arguments:
|
Arguments:
|
||||||
partition (TopicPartition): partition to check
|
partition (TopicPartition): partition to check
|
||||||
"""
|
|
||||||
assert self._subscription.is_assigned(partition)
|
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: offset
|
||||||
|
"""
|
||||||
|
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
|
||||||
offset = self._subscription.assignment[partition].position
|
offset = self._subscription.assignment[partition].position
|
||||||
if offset is None:
|
if offset is None:
|
||||||
self._update_fetch_positions(partition)
|
self._update_fetch_positions(partition)
|
||||||
@@ -529,8 +531,13 @@ class KafkaConsumer(six.Iterator):
|
|||||||
Arguments:
|
Arguments:
|
||||||
partition (TopicPartition): partition for seek operation
|
partition (TopicPartition): partition for seek operation
|
||||||
offset (int): message offset in partition
|
offset (int): message offset in partition
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
AssertionError: if offset is not an int >= 0; or if partition is not
|
||||||
|
currently assigned.
|
||||||
"""
|
"""
|
||||||
assert offset >= 0
|
assert isinstance(offset, int) and offset >= 0, 'Offset must be >= 0'
|
||||||
|
assert partition in self._subscription.assigned_partitions(), 'Unassigned partition'
|
||||||
log.debug("Seeking to offset %s for partition %s", offset, partition)
|
log.debug("Seeking to offset %s for partition %s", offset, partition)
|
||||||
self._subscription.assignment[partition].seek(offset)
|
self._subscription.assignment[partition].seek(offset)
|
||||||
|
|
||||||
@@ -540,9 +547,18 @@ class KafkaConsumer(six.Iterator):
|
|||||||
Arguments:
|
Arguments:
|
||||||
*partitions: optionally provide specific TopicPartitions, otherwise
|
*partitions: optionally provide specific TopicPartitions, otherwise
|
||||||
default to all assigned partitions
|
default to all assigned partitions
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
AssertionError: if any partition is not currently assigned, or if
|
||||||
|
no partitions are assigned
|
||||||
"""
|
"""
|
||||||
if not partitions:
|
if not partitions:
|
||||||
partitions = self._subscription.assigned_partitions()
|
partitions = self._subscription.assigned_partitions()
|
||||||
|
assert partitions, 'No partitions are currently assigned'
|
||||||
|
else:
|
||||||
|
for p in partitions:
|
||||||
|
assert p in self._subscription.assigned_partitions(), 'Unassigned partition'
|
||||||
|
|
||||||
for tp in partitions:
|
for tp in partitions:
|
||||||
log.debug("Seeking to beginning of partition %s", tp)
|
log.debug("Seeking to beginning of partition %s", tp)
|
||||||
self._subscription.need_offset_reset(tp, OffsetResetStrategy.EARLIEST)
|
self._subscription.need_offset_reset(tp, OffsetResetStrategy.EARLIEST)
|
||||||
@@ -553,9 +569,18 @@ class KafkaConsumer(six.Iterator):
|
|||||||
Arguments:
|
Arguments:
|
||||||
*partitions: optionally provide specific TopicPartitions, otherwise
|
*partitions: optionally provide specific TopicPartitions, otherwise
|
||||||
default to all assigned partitions
|
default to all assigned partitions
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
AssertionError: if any partition is not currently assigned, or if
|
||||||
|
no partitions are assigned
|
||||||
"""
|
"""
|
||||||
if not partitions:
|
if not partitions:
|
||||||
partitions = self._subscription.assigned_partitions()
|
partitions = self._subscription.assigned_partitions()
|
||||||
|
assert partitions, 'No partitions are currently assigned'
|
||||||
|
else:
|
||||||
|
for p in partitions:
|
||||||
|
assert p in self._subscription.assigned_partitions(), 'Unassigned partition'
|
||||||
|
|
||||||
for tp in partitions:
|
for tp in partitions:
|
||||||
log.debug("Seeking to end of partition %s", tp)
|
log.debug("Seeking to end of partition %s", tp)
|
||||||
self._subscription.need_offset_reset(tp, OffsetResetStrategy.LATEST)
|
self._subscription.need_offset_reset(tp, OffsetResetStrategy.LATEST)
|
||||||
|
|||||||
Reference in New Issue
Block a user