Raise TypeError in kafka.producer.send_messages if any msg is not a str (or subclass); document

This commit is contained in:
Dana Powers
2014-08-26 13:15:25 -07:00
parent f7be23f6a7
commit a28120aa8b
2 changed files with 28 additions and 0 deletions

View File

@@ -42,9 +42,14 @@ kafka = KafkaClient("localhost:9092")
# To send messages synchronously
producer = SimpleProducer(kafka)
# Note that the application is responsible for encoding messages to type str
producer.send_messages("my-topic", "some message")
producer.send_messages("my-topic", "this method", "is variadic")
# Send unicode message
producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8'))
# To send messages asynchronously
producer = SimpleProducer(kafka, async=True)
producer.send_messages("my-topic", "async message")
@@ -78,6 +83,8 @@ producer = SimpleProducer(kafka, batch_send=True,
# To consume messages
consumer = SimpleConsumer(kafka, "my-group", "my-topic")
for message in consumer:
# message is raw byte string -- decode if necessary!
# e.g., for unicode: `message.decode('utf-8')`
print(message)
kafka.close()

View File

@@ -148,7 +148,28 @@ class Producer(object):
def send_messages(self, topic, partition, *msg):
"""
Helper method to send produce requests
@param: topic, name of topic for produce request -- type str
@param: partition, partition number for produce request -- type int
@param: *msg, one or more message payloads -- type str
@returns: ResponseRequest returned by server
raises on error
Note that msg type *must* be encoded to str by user.
Passing unicode message will not work, for example
you should encode before calling send_messages via
something like `unicode_message.encode('utf-8')`
All messages produced via this method will set the message 'key' to Null
"""
# Guarantee that msg is actually a list or tuple (should always be true)
if not isinstance(msg, (list, tuple)):
raise TypeError("msg is not a list or tuple!")
# Raise TypeError if any message is not encoded as a str
if any(not isinstance(m, str) for m in msg):
raise TypeError("all produce message payloads must be type str")
if self.async:
for m in msg:
self.queue.put((TopicAndPartition(topic, partition), m))