43 lines
		
	
	
		
			1.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			43 lines
		
	
	
		
			1.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- coding: utf-8 -*-
 | 
						|
 | 
						|
import logging
 | 
						|
 | 
						|
from mock import MagicMock
 | 
						|
from . import unittest
 | 
						|
 | 
						|
from kafka.producer.base import Producer
 | 
						|
 | 
						|
 | 
						|
class TestKafkaProducer(unittest.TestCase):
 | 
						|
    def test_producer_message_types(self):
 | 
						|
 | 
						|
        producer = Producer(MagicMock())
 | 
						|
        topic = b"test-topic"
 | 
						|
        partition = 0
 | 
						|
 | 
						|
        bad_data_types = (u'你怎么样?', 12, ['a', 'list'], ('a', 'tuple'), {'a': 'dict'})
 | 
						|
        for m in bad_data_types:
 | 
						|
            with self.assertRaises(TypeError):
 | 
						|
                logging.debug("attempting to send message of type %s", type(m))
 | 
						|
                producer.send_messages(topic, partition, m)
 | 
						|
 | 
						|
        good_data_types = (b'a string!',)
 | 
						|
        for m in good_data_types:
 | 
						|
            # This should not raise an exception
 | 
						|
            producer.send_messages(topic, partition, m)
 | 
						|
 | 
						|
    def test_topic_message_types(self):
 | 
						|
        from kafka.producer.simple import SimpleProducer
 | 
						|
 | 
						|
        client = MagicMock()
 | 
						|
 | 
						|
        def partitions(topic):
 | 
						|
            return [0, 1]
 | 
						|
 | 
						|
        client.get_partition_ids_for_topic = partitions
 | 
						|
 | 
						|
        producer = SimpleProducer(client, random_start=False)
 | 
						|
        topic = b"test-topic"
 | 
						|
        producer.send_messages(topic, b'hi')
 | 
						|
        assert client.send_produce_request.called
 |