Raise TypeError in KafkaConsumer when partition is not a TopicPartition
This commit is contained in:
@@ -7,6 +7,7 @@ import time
|
||||
import six
|
||||
|
||||
from kafka.client_async import KafkaClient
|
||||
from kafka.common import TopicPartition
|
||||
from kafka.consumer.fetcher import Fetcher
|
||||
from kafka.consumer.subscription_state import SubscriptionState
|
||||
from kafka.coordinator.consumer import ConsumerCoordinator
|
||||
@@ -344,6 +345,8 @@ class KafkaConsumer(six.Iterator):
|
||||
"""
|
||||
assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1'
|
||||
assert self.config['group_id'] is not None, 'Requires group_id'
|
||||
if not isinstance(partition, TopicPartition):
|
||||
raise TypeError('partition must be a TopicPartition namedtuple')
|
||||
if self._subscription.is_assigned(partition):
|
||||
committed = self._subscription.assignment[partition].committed
|
||||
if committed is None:
|
||||
@@ -474,6 +477,8 @@ class KafkaConsumer(six.Iterator):
|
||||
Returns:
|
||||
int: offset
|
||||
"""
|
||||
if not isinstance(partition, TopicPartition):
|
||||
raise TypeError('partition must be a TopicPartition namedtuple')
|
||||
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
|
||||
offset = self._subscription.assignment[partition].position
|
||||
if offset is None:
|
||||
@@ -500,6 +505,8 @@ class KafkaConsumer(six.Iterator):
|
||||
Returns:
|
||||
int or None: offset if available
|
||||
"""
|
||||
if not isinstance(partition, TopicPartition):
|
||||
raise TypeError('partition must be a TopicPartition namedtuple')
|
||||
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
|
||||
return self._subscription.assignment[partition].highwater
|
||||
|
||||
@@ -514,6 +521,8 @@ class KafkaConsumer(six.Iterator):
|
||||
Arguments:
|
||||
*partitions (TopicPartition): partitions to pause
|
||||
"""
|
||||
if not all([isinstance(p, TopicPartition) for p in partitions]):
|
||||
raise TypeError('partitions must be TopicPartition namedtuples')
|
||||
for partition in partitions:
|
||||
log.debug("Pausing partition %s", partition)
|
||||
self._subscription.pause(partition)
|
||||
@@ -524,6 +533,8 @@ class KafkaConsumer(six.Iterator):
|
||||
Arguments:
|
||||
*partitions (TopicPartition): partitions to resume
|
||||
"""
|
||||
if not all([isinstance(p, TopicPartition) for p in partitions]):
|
||||
raise TypeError('partitions must be TopicPartition namedtuples')
|
||||
for partition in partitions:
|
||||
log.debug("Resuming partition %s", partition)
|
||||
self._subscription.resume(partition)
|
||||
@@ -545,6 +556,8 @@ class KafkaConsumer(six.Iterator):
|
||||
AssertionError: if offset is not an int >= 0; or if partition is not
|
||||
currently assigned.
|
||||
"""
|
||||
if not isinstance(partition, TopicPartition):
|
||||
raise TypeError('partition must be a TopicPartition namedtuple')
|
||||
assert isinstance(offset, int) and offset >= 0, 'Offset must be >= 0'
|
||||
assert partition in self._subscription.assigned_partitions(), 'Unassigned partition'
|
||||
log.debug("Seeking to offset %s for partition %s", offset, partition)
|
||||
@@ -561,6 +574,8 @@ class KafkaConsumer(six.Iterator):
|
||||
AssertionError: if any partition is not currently assigned, or if
|
||||
no partitions are assigned
|
||||
"""
|
||||
if not all([isinstance(p, TopicPartition) for p in partitions]):
|
||||
raise TypeError('partitions must be TopicPartition namedtuples')
|
||||
if not partitions:
|
||||
partitions = self._subscription.assigned_partitions()
|
||||
assert partitions, 'No partitions are currently assigned'
|
||||
@@ -583,6 +598,8 @@ class KafkaConsumer(six.Iterator):
|
||||
AssertionError: if any partition is not currently assigned, or if
|
||||
no partitions are assigned
|
||||
"""
|
||||
if not all([isinstance(p, TopicPartition) for p in partitions]):
|
||||
raise TypeError('partitions must be TopicPartition namedtuples')
|
||||
if not partitions:
|
||||
partitions = self._subscription.assigned_partitions()
|
||||
assert partitions, 'No partitions are currently assigned'
|
||||
|
||||
Reference in New Issue
Block a user