Merge pull request #306 from sontek/catch_topic_not_bytes
Alert if the topic is not sent as bytes
This commit is contained in:
@@ -2,6 +2,7 @@ from __future__ import absolute_import
|
||||
|
||||
import logging
|
||||
import random
|
||||
import six
|
||||
|
||||
from itertools import cycle
|
||||
|
||||
@@ -68,8 +69,13 @@ class SimpleProducer(Producer):
|
||||
return next(self.partition_cycles[topic])
|
||||
|
||||
def send_messages(self, topic, *msg):
|
||||
if not isinstance(topic, six.binary_type):
|
||||
topic = topic.encode('utf-8')
|
||||
|
||||
partition = self._next_partition(topic)
|
||||
return super(SimpleProducer, self).send_messages(topic, partition, *msg)
|
||||
return super(SimpleProducer, self).send_messages(
|
||||
topic, partition, *msg
|
||||
)
|
||||
|
||||
def __repr__(self):
|
||||
return '<SimpleProducer batch=%s>' % self.async
|
||||
|
@@ -7,6 +7,7 @@ from . import unittest
|
||||
|
||||
from kafka.producer.base import Producer
|
||||
|
||||
|
||||
class TestKafkaProducer(unittest.TestCase):
|
||||
def test_producer_message_types(self):
|
||||
|
||||
@@ -25,3 +26,17 @@ class TestKafkaProducer(unittest.TestCase):
|
||||
# This should not raise an exception
|
||||
producer.send_messages(topic, partition, m)
|
||||
|
||||
def test_topic_message_types(self):
|
||||
from kafka.producer.simple import SimpleProducer
|
||||
|
||||
client = MagicMock()
|
||||
|
||||
def partitions(topic):
|
||||
return [0, 1]
|
||||
|
||||
client.get_partition_ids_for_topic = partitions
|
||||
|
||||
producer = SimpleProducer(client, random_start=False)
|
||||
topic = b"test-topic"
|
||||
producer.send_messages(topic, b'hi')
|
||||
assert client.send_produce_request.called
|
||||
|
Reference in New Issue
Block a user