Merge pull request #211 from dpkp/str_only_produce_msgs
Strictly enforce str message type in producer.send_messages
This commit is contained in:
@@ -42,9 +42,14 @@ kafka = KafkaClient("localhost:9092")
|
||||
|
||||
# To send messages synchronously
|
||||
producer = SimpleProducer(kafka)
|
||||
|
||||
# Note that the application is responsible for encoding messages to type str
|
||||
producer.send_messages("my-topic", "some message")
|
||||
producer.send_messages("my-topic", "this method", "is variadic")
|
||||
|
||||
# Send unicode message
|
||||
producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8'))
|
||||
|
||||
# To send messages asynchronously
|
||||
producer = SimpleProducer(kafka, async=True)
|
||||
producer.send_messages("my-topic", "async message")
|
||||
@@ -78,6 +83,8 @@ producer = SimpleProducer(kafka, batch_send=True,
|
||||
# To consume messages
|
||||
consumer = SimpleConsumer(kafka, "my-group", "my-topic")
|
||||
for message in consumer:
|
||||
# message is raw byte string -- decode if necessary!
|
||||
# e.g., for unicode: `message.decode('utf-8')`
|
||||
print(message)
|
||||
|
||||
kafka.close()
|
||||
|
||||
@@ -148,7 +148,28 @@ class Producer(object):
|
||||
def send_messages(self, topic, partition, *msg):
|
||||
"""
|
||||
Helper method to send produce requests
|
||||
@param: topic, name of topic for produce request -- type str
|
||||
@param: partition, partition number for produce request -- type int
|
||||
@param: *msg, one or more message payloads -- type str
|
||||
@returns: ResponseRequest returned by server
|
||||
raises on error
|
||||
|
||||
Note that msg type *must* be encoded to str by user.
|
||||
Passing unicode message will not work, for example
|
||||
you should encode before calling send_messages via
|
||||
something like `unicode_message.encode('utf-8')`
|
||||
|
||||
All messages produced via this method will set the message 'key' to Null
|
||||
"""
|
||||
|
||||
# Guarantee that msg is actually a list or tuple (should always be true)
|
||||
if not isinstance(msg, (list, tuple)):
|
||||
raise TypeError("msg is not a list or tuple!")
|
||||
|
||||
# Raise TypeError if any message is not encoded as a str
|
||||
if any(not isinstance(m, str) for m in msg):
|
||||
raise TypeError("all produce message payloads must be type str")
|
||||
|
||||
if self.async:
|
||||
for m in msg:
|
||||
self.queue.put((TopicAndPartition(topic, partition), m))
|
||||
|
||||
31
test/test_producer.py
Normal file
31
test/test_producer.py
Normal file
@@ -0,0 +1,31 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import struct
|
||||
import unittest2
|
||||
|
||||
from mock import MagicMock, patch
|
||||
|
||||
from kafka import KafkaClient
|
||||
from kafka.producer import Producer
|
||||
|
||||
class TestKafkaProducer(unittest2.TestCase):
|
||||
def test_producer_message_types(self):
|
||||
|
||||
producer = Producer(MagicMock())
|
||||
topic = "test-topic"
|
||||
partition = 0
|
||||
|
||||
bad_data_types = (u'你怎么样?', 12, ['a','list'], ('a','tuple'), {'a': 'dict'})
|
||||
for m in bad_data_types:
|
||||
with self.assertRaises(TypeError):
|
||||
logging.debug("attempting to send message of type %s", type(m))
|
||||
producer.send_messages(topic, partition, m)
|
||||
|
||||
good_data_types = ('a string!',)
|
||||
for m in good_data_types:
|
||||
# This should not raise an exception
|
||||
producer.send_messages(topic, partition, m)
|
||||
|
||||
Reference in New Issue
Block a user