From 3a52d18faf0d46d3611e9fc274a03877c88f118e Mon Sep 17 00:00:00 2001 From: John Anderson Date: Fri, 23 Jan 2015 15:39:15 -0800 Subject: [PATCH 1/2] Alert if the topic is not sent as bytes --- kafka/producer/simple.py | 4 ++++ test/test_producer.py | 11 +++++++++++ 2 files changed, 15 insertions(+) diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index afeae06..17e0977 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -2,6 +2,7 @@ from __future__ import absolute_import import logging import random +import six from itertools import cycle @@ -68,6 +69,9 @@ class SimpleProducer(Producer): return next(self.partition_cycles[topic]) def send_messages(self, topic, *msg): + if not isinstance(topic, six.binary_type): + raise TypeError("topic must be type bytes") + partition = self._next_partition(topic) return super(SimpleProducer, self).send_messages(topic, partition, *msg) diff --git a/test/test_producer.py b/test/test_producer.py index caf8fe3..119930f 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -25,3 +25,14 @@ class TestKafkaProducer(unittest.TestCase): # This should not raise an exception producer.send_messages(topic, partition, m) + def test_topic_message_types(self): + from kafka.producer.simple import SimpleProducer + + producer = SimpleProducer(MagicMock()) + topic = "test-topic" + partition = 0 + + def send_message(): + producer.send_messages(topic, partition, b'hi') + + self.assertRaises(TypeError, send_message) From 72bec18adde851510c1c627ce8dee659425f727b Mon Sep 17 00:00:00 2001 From: John Anderson Date: Sat, 24 Jan 2015 16:32:09 -0800 Subject: [PATCH 2/2] Just encode to bytes if it isn't bytes. --- kafka/producer/simple.py | 6 ++++-- test/test_producer.py | 16 ++++++++++------ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index 17e0977..2699cf2 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -70,10 +70,12 @@ class SimpleProducer(Producer): def send_messages(self, topic, *msg): if not isinstance(topic, six.binary_type): - raise TypeError("topic must be type bytes") + topic = topic.encode('utf-8') partition = self._next_partition(topic) - return super(SimpleProducer, self).send_messages(topic, partition, *msg) + return super(SimpleProducer, self).send_messages( + topic, partition, *msg + ) def __repr__(self): return '' % self.async diff --git a/test/test_producer.py b/test/test_producer.py index 119930f..f6b3d6a 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -7,6 +7,7 @@ from . import unittest from kafka.producer.base import Producer + class TestKafkaProducer(unittest.TestCase): def test_producer_message_types(self): @@ -28,11 +29,14 @@ class TestKafkaProducer(unittest.TestCase): def test_topic_message_types(self): from kafka.producer.simple import SimpleProducer - producer = SimpleProducer(MagicMock()) - topic = "test-topic" - partition = 0 + client = MagicMock() - def send_message(): - producer.send_messages(topic, partition, b'hi') + def partitions(topic): + return [0, 1] - self.assertRaises(TypeError, send_message) + client.get_partition_ids_for_topic = partitions + + producer = SimpleProducer(client, random_start=False) + topic = b"test-topic" + producer.send_messages(topic, b'hi') + assert client.send_produce_request.called