Use kafka.util.kafka_bytestring to encode utf-8 when necessary
This commit is contained in:
@@ -15,6 +15,7 @@ from kafka.common import (
|
||||
OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout,
|
||||
FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError
|
||||
)
|
||||
from kafka.util import kafka_bytestring
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -225,18 +226,14 @@ class KafkaConsumer(object):
|
||||
|
||||
# Topic name str -- all partitions
|
||||
if isinstance(arg, (six.string_types, six.binary_type)):
|
||||
topic = arg
|
||||
if isinstance(topic, six.string_types):
|
||||
topic = topic.encode('utf-8')
|
||||
topic = kafka_bytestring(arg)
|
||||
|
||||
for partition in self._client.get_partition_ids_for_topic(arg):
|
||||
self._consume_topic_partition(topic, partition)
|
||||
|
||||
# (topic, partition [, offset]) tuple
|
||||
elif isinstance(arg, tuple):
|
||||
topic = arg[0]
|
||||
if isinstance(topic, six.string_types):
|
||||
topic = topic.encode('utf-8')
|
||||
topic = kafka_bytestring(arg[0])
|
||||
partition = arg[1]
|
||||
if len(arg) == 3:
|
||||
offset = arg[2]
|
||||
@@ -249,9 +246,7 @@ class KafkaConsumer(object):
|
||||
|
||||
# key can be string (a topic)
|
||||
if isinstance(key, (six.string_types, six.binary_type)):
|
||||
topic = key
|
||||
if isinstance(topic, six.string_types):
|
||||
topic = topic.encode('utf-8')
|
||||
topic = kafka_bytestring(key)
|
||||
|
||||
# topic: partition
|
||||
if isinstance(value, int):
|
||||
@@ -267,9 +262,7 @@ class KafkaConsumer(object):
|
||||
|
||||
# (topic, partition): offset
|
||||
elif isinstance(key, tuple):
|
||||
topic = key[0]
|
||||
if isinstance(topic, six.string_types):
|
||||
topic = topic.encode('utf-8')
|
||||
topic = kafka_bytestring(key[0])
|
||||
partition = key[1]
|
||||
self._consume_topic_partition(topic, partition)
|
||||
self._offsets.fetch[key] = value
|
||||
@@ -562,9 +555,7 @@ class KafkaConsumer(object):
|
||||
#
|
||||
|
||||
def _consume_topic_partition(self, topic, partition):
|
||||
if not isinstance(topic, six.binary_type):
|
||||
raise KafkaConfigurationError('Unknown topic type (%s) '
|
||||
'-- expected bytes' % type(topic))
|
||||
topic = kafka_bytestring(topic)
|
||||
if not isinstance(partition, int):
|
||||
raise KafkaConfigurationError('Unknown partition type (%s) '
|
||||
'-- expected int' % type(partition))
|
||||
|
@@ -86,6 +86,18 @@ def group_by_topic_and_partition(tuples):
|
||||
return out
|
||||
|
||||
|
||||
def kafka_bytestring(s):
|
||||
"""
|
||||
Takes a string or bytes instance
|
||||
Returns bytes, encoding strings in utf-8 as necessary
|
||||
"""
|
||||
if isinstance(s, six.binary_type):
|
||||
return s
|
||||
if isinstance(s, six.string_types):
|
||||
return s.encode('utf-8')
|
||||
raise TypeError(s)
|
||||
|
||||
|
||||
class ReentrantTimer(object):
|
||||
"""
|
||||
A timer that can be restarted, unlike threading.Timer
|
||||
|
Reference in New Issue
Block a user