Convert partition key to str
In kafka 0.9.5 the partition key needs to be a six.binary_type which is a str in Python 2.6 Change-Id: I423694d635521d286bfdafc2172b31f3dca21860
This commit is contained in:
parent
fbf46dca69
commit
ca22e64edd
@ -40,22 +40,13 @@ class KafkaProducer(object):
|
|||||||
"""Takes messages and puts them on the supplied kafka topic
|
"""Takes messages and puts them on the supplied kafka topic
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Using a key producer to make sure we can distribute messages evenly
|
|
||||||
# across all partitions. In the kafka-python library, as of version
|
|
||||||
# 0.9.2, it doesn't support sending message batches for keyed
|
|
||||||
# producers. Batching writes to kafka is important for performance so
|
|
||||||
# we have to work around this limitation. Using the _next_partition
|
|
||||||
# function allows us to get proper distribution and the speed of the
|
|
||||||
# send_messages function.
|
|
||||||
|
|
||||||
if not isinstance(messages, list):
|
if not isinstance(messages, list):
|
||||||
messages = [messages]
|
messages = [messages]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if key is None:
|
if key is None:
|
||||||
key = time.time() * 1000
|
key = int(time.time() * 1000)
|
||||||
partition = self._producer._next_partition(topic, key)
|
self._producer.send_messages(topic, str(key), *messages)
|
||||||
self._producer.send_messages(topic, partition, *messages)
|
|
||||||
except Exception:
|
except Exception:
|
||||||
log.exception('Error publishing to {} topic.'.format(topic))
|
log.exception('Error publishing to {} topic.'.format(topic))
|
||||||
raise
|
raise
|
||||||
|
Loading…
Reference in New Issue
Block a user