Merge pull request #587 from dpkp/topic_partition_type_error
Raise TypeError in KafkaConsumer when partition is not a TopicPartition
This commit is contained in:
@@ -7,6 +7,7 @@ import time
|
|||||||
import six
|
import six
|
||||||
|
|
||||||
from kafka.client_async import KafkaClient
|
from kafka.client_async import KafkaClient
|
||||||
|
from kafka.common import TopicPartition
|
||||||
from kafka.consumer.fetcher import Fetcher
|
from kafka.consumer.fetcher import Fetcher
|
||||||
from kafka.consumer.subscription_state import SubscriptionState
|
from kafka.consumer.subscription_state import SubscriptionState
|
||||||
from kafka.coordinator.consumer import ConsumerCoordinator
|
from kafka.coordinator.consumer import ConsumerCoordinator
|
||||||
@@ -344,6 +345,8 @@ class KafkaConsumer(six.Iterator):
|
|||||||
"""
|
"""
|
||||||
assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1'
|
assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1'
|
||||||
assert self.config['group_id'] is not None, 'Requires group_id'
|
assert self.config['group_id'] is not None, 'Requires group_id'
|
||||||
|
if not isinstance(partition, TopicPartition):
|
||||||
|
raise TypeError('partition must be a TopicPartition namedtuple')
|
||||||
if self._subscription.is_assigned(partition):
|
if self._subscription.is_assigned(partition):
|
||||||
committed = self._subscription.assignment[partition].committed
|
committed = self._subscription.assignment[partition].committed
|
||||||
if committed is None:
|
if committed is None:
|
||||||
@@ -474,6 +477,8 @@ class KafkaConsumer(six.Iterator):
|
|||||||
Returns:
|
Returns:
|
||||||
int: offset
|
int: offset
|
||||||
"""
|
"""
|
||||||
|
if not isinstance(partition, TopicPartition):
|
||||||
|
raise TypeError('partition must be a TopicPartition namedtuple')
|
||||||
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
|
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
|
||||||
offset = self._subscription.assignment[partition].position
|
offset = self._subscription.assignment[partition].position
|
||||||
if offset is None:
|
if offset is None:
|
||||||
@@ -500,6 +505,8 @@ class KafkaConsumer(six.Iterator):
|
|||||||
Returns:
|
Returns:
|
||||||
int or None: offset if available
|
int or None: offset if available
|
||||||
"""
|
"""
|
||||||
|
if not isinstance(partition, TopicPartition):
|
||||||
|
raise TypeError('partition must be a TopicPartition namedtuple')
|
||||||
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
|
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
|
||||||
return self._subscription.assignment[partition].highwater
|
return self._subscription.assignment[partition].highwater
|
||||||
|
|
||||||
@@ -514,6 +521,8 @@ class KafkaConsumer(six.Iterator):
|
|||||||
Arguments:
|
Arguments:
|
||||||
*partitions (TopicPartition): partitions to pause
|
*partitions (TopicPartition): partitions to pause
|
||||||
"""
|
"""
|
||||||
|
if not all([isinstance(p, TopicPartition) for p in partitions]):
|
||||||
|
raise TypeError('partitions must be TopicPartition namedtuples')
|
||||||
for partition in partitions:
|
for partition in partitions:
|
||||||
log.debug("Pausing partition %s", partition)
|
log.debug("Pausing partition %s", partition)
|
||||||
self._subscription.pause(partition)
|
self._subscription.pause(partition)
|
||||||
@@ -524,6 +533,8 @@ class KafkaConsumer(six.Iterator):
|
|||||||
Arguments:
|
Arguments:
|
||||||
*partitions (TopicPartition): partitions to resume
|
*partitions (TopicPartition): partitions to resume
|
||||||
"""
|
"""
|
||||||
|
if not all([isinstance(p, TopicPartition) for p in partitions]):
|
||||||
|
raise TypeError('partitions must be TopicPartition namedtuples')
|
||||||
for partition in partitions:
|
for partition in partitions:
|
||||||
log.debug("Resuming partition %s", partition)
|
log.debug("Resuming partition %s", partition)
|
||||||
self._subscription.resume(partition)
|
self._subscription.resume(partition)
|
||||||
@@ -545,6 +556,8 @@ class KafkaConsumer(six.Iterator):
|
|||||||
AssertionError: if offset is not an int >= 0; or if partition is not
|
AssertionError: if offset is not an int >= 0; or if partition is not
|
||||||
currently assigned.
|
currently assigned.
|
||||||
"""
|
"""
|
||||||
|
if not isinstance(partition, TopicPartition):
|
||||||
|
raise TypeError('partition must be a TopicPartition namedtuple')
|
||||||
assert isinstance(offset, int) and offset >= 0, 'Offset must be >= 0'
|
assert isinstance(offset, int) and offset >= 0, 'Offset must be >= 0'
|
||||||
assert partition in self._subscription.assigned_partitions(), 'Unassigned partition'
|
assert partition in self._subscription.assigned_partitions(), 'Unassigned partition'
|
||||||
log.debug("Seeking to offset %s for partition %s", offset, partition)
|
log.debug("Seeking to offset %s for partition %s", offset, partition)
|
||||||
@@ -561,6 +574,8 @@ class KafkaConsumer(six.Iterator):
|
|||||||
AssertionError: if any partition is not currently assigned, or if
|
AssertionError: if any partition is not currently assigned, or if
|
||||||
no partitions are assigned
|
no partitions are assigned
|
||||||
"""
|
"""
|
||||||
|
if not all([isinstance(p, TopicPartition) for p in partitions]):
|
||||||
|
raise TypeError('partitions must be TopicPartition namedtuples')
|
||||||
if not partitions:
|
if not partitions:
|
||||||
partitions = self._subscription.assigned_partitions()
|
partitions = self._subscription.assigned_partitions()
|
||||||
assert partitions, 'No partitions are currently assigned'
|
assert partitions, 'No partitions are currently assigned'
|
||||||
@@ -583,6 +598,8 @@ class KafkaConsumer(six.Iterator):
|
|||||||
AssertionError: if any partition is not currently assigned, or if
|
AssertionError: if any partition is not currently assigned, or if
|
||||||
no partitions are assigned
|
no partitions are assigned
|
||||||
"""
|
"""
|
||||||
|
if not all([isinstance(p, TopicPartition) for p in partitions]):
|
||||||
|
raise TypeError('partitions must be TopicPartition namedtuples')
|
||||||
if not partitions:
|
if not partitions:
|
||||||
partitions = self._subscription.assigned_partitions()
|
partitions = self._subscription.assigned_partitions()
|
||||||
assert partitions, 'No partitions are currently assigned'
|
assert partitions, 'No partitions are currently assigned'
|
||||||
|
|||||||
Reference in New Issue
Block a user