Merge pull request #409 from scrapinghub/feature-allow-null-payload
Allow null payload for deletion feature
This commit is contained in:
@@ -355,9 +355,15 @@ class Producer(object):
|
||||
if not isinstance(msg, (list, tuple)):
|
||||
raise TypeError("msg is not a list or tuple!")
|
||||
|
||||
# Raise TypeError if any message is not encoded as bytes
|
||||
if any(not isinstance(m, six.binary_type) for m in msg):
|
||||
raise TypeError("all produce message payloads must be type bytes")
|
||||
for m in msg:
|
||||
# The protocol allows to have key & payload with null values both,
|
||||
# (https://goo.gl/o694yN) but having (null,null) pair doesn't make sense.
|
||||
if m is None:
|
||||
if key is None:
|
||||
raise TypeError("key and payload can't be null in one")
|
||||
# Raise TypeError if any non-null message is not encoded as bytes
|
||||
elif not isinstance(m, six.binary_type):
|
||||
raise TypeError("all produce message payloads must be null or type bytes")
|
||||
|
||||
# Raise TypeError if topic is not encoded as bytes
|
||||
if not isinstance(topic, six.binary_type):
|
||||
|
||||
@@ -7,7 +7,7 @@ import time
|
||||
from mock import MagicMock, patch
|
||||
from . import unittest
|
||||
|
||||
from kafka import KafkaClient, SimpleProducer
|
||||
from kafka import KafkaClient, SimpleProducer, KeyedProducer
|
||||
from kafka.common import (
|
||||
AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError,
|
||||
ProduceResponse, RetryOptions, TopicAndPartition
|
||||
@@ -33,7 +33,8 @@ class TestKafkaProducer(unittest.TestCase):
|
||||
topic = b"test-topic"
|
||||
partition = 0
|
||||
|
||||
bad_data_types = (u'你怎么样?', 12, ['a', 'list'], ('a', 'tuple'), {'a': 'dict'})
|
||||
bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
|
||||
('a', 'tuple'), {'a': 'dict'}, None,)
|
||||
for m in bad_data_types:
|
||||
with self.assertRaises(TypeError):
|
||||
logging.debug("attempting to send message of type %s", type(m))
|
||||
@@ -44,6 +45,25 @@ class TestKafkaProducer(unittest.TestCase):
|
||||
# This should not raise an exception
|
||||
producer.send_messages(topic, partition, m)
|
||||
|
||||
def test_keyedproducer_message_types(self):
|
||||
client = MagicMock()
|
||||
client.get_partition_ids_for_topic.return_value = [0, 1]
|
||||
producer = KeyedProducer(client)
|
||||
topic = b"test-topic"
|
||||
key = b"testkey"
|
||||
|
||||
bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
|
||||
('a', 'tuple'), {'a': 'dict'},)
|
||||
for m in bad_data_types:
|
||||
with self.assertRaises(TypeError):
|
||||
logging.debug("attempting to send message of type %s", type(m))
|
||||
producer.send_messages(topic, key, m)
|
||||
|
||||
good_data_types = (b'a string!', None,)
|
||||
for m in good_data_types:
|
||||
# This should not raise an exception
|
||||
producer.send_messages(topic, key, m)
|
||||
|
||||
def test_topic_message_types(self):
|
||||
client = MagicMock()
|
||||
|
||||
|
||||
@@ -341,6 +341,28 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
# KeyedProducer Tests #
|
||||
############################
|
||||
|
||||
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
|
||||
def test_keyedproducer_null_payload(self):
|
||||
partitions = self.client.get_partition_ids_for_topic(self.topic)
|
||||
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
|
||||
|
||||
producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner)
|
||||
key = "test"
|
||||
|
||||
resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one"))
|
||||
self.assert_produce_response(resp, start_offsets[0])
|
||||
resp = producer.send_messages(self.topic, self.key("key2"), None)
|
||||
self.assert_produce_response(resp, start_offsets[1])
|
||||
resp = producer.send_messages(self.topic, self.key("key3"), None)
|
||||
self.assert_produce_response(resp, start_offsets[0]+1)
|
||||
resp = producer.send_messages(self.topic, self.key("key4"), self.msg("four"))
|
||||
self.assert_produce_response(resp, start_offsets[1]+1)
|
||||
|
||||
self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), None ])
|
||||
self.assert_fetch_offset(partitions[1], start_offsets[1], [ None, self.msg("four") ])
|
||||
|
||||
producer.stop()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_round_robin_partitioner(self):
|
||||
partitions = self.client.get_partition_ids_for_topic(self.topic)
|
||||
|
||||
Reference in New Issue
Block a user