Merge pull request #489 from dpkp/producer_new_topic
Handle new topic auto-creation in SimpleProducer.send_messages
This commit is contained in:
@@ -33,7 +33,7 @@ class SimpleProducer(Producer):
|
|||||||
def _next_partition(self, topic):
|
def _next_partition(self, topic):
|
||||||
if topic not in self.partition_cycles:
|
if topic not in self.partition_cycles:
|
||||||
if not self.client.has_metadata_for_topic(topic):
|
if not self.client.has_metadata_for_topic(topic):
|
||||||
self.client.load_metadata_for_topics(topic)
|
self.client.ensure_topic_exists(topic)
|
||||||
|
|
||||||
self.partition_cycles[topic] = cycle(self.client.get_partition_ids_for_topic(topic))
|
self.partition_cycles[topic] = cycle(self.client.get_partition_ids_for_topic(topic))
|
||||||
|
|
||||||
|
@@ -97,19 +97,20 @@ class TestKafkaProducer(unittest.TestCase):
|
|||||||
def test_producer_sync_fail_on_error(self):
|
def test_producer_sync_fail_on_error(self):
|
||||||
error = FailedPayloadsError('failure')
|
error = FailedPayloadsError('failure')
|
||||||
with patch.object(KafkaClient, 'load_metadata_for_topics'):
|
with patch.object(KafkaClient, 'load_metadata_for_topics'):
|
||||||
with patch.object(KafkaClient, 'get_partition_ids_for_topic', return_value=[0, 1]):
|
with patch.object(KafkaClient, 'ensure_topic_exists'):
|
||||||
with patch.object(KafkaClient, '_send_broker_aware_request', return_value = [error]):
|
with patch.object(KafkaClient, 'get_partition_ids_for_topic', return_value=[0, 1]):
|
||||||
|
with patch.object(KafkaClient, '_send_broker_aware_request', return_value = [error]):
|
||||||
|
|
||||||
client = KafkaClient(MagicMock())
|
client = KafkaClient(MagicMock())
|
||||||
producer = SimpleProducer(client, async=False, sync_fail_on_error=False)
|
producer = SimpleProducer(client, async=False, sync_fail_on_error=False)
|
||||||
|
|
||||||
# This should not raise
|
# This should not raise
|
||||||
(response,) = producer.send_messages('foobar', b'test message')
|
(response,) = producer.send_messages('foobar', b'test message')
|
||||||
self.assertEqual(response, error)
|
self.assertEqual(response, error)
|
||||||
|
|
||||||
producer = SimpleProducer(client, async=False, sync_fail_on_error=True)
|
producer = SimpleProducer(client, async=False, sync_fail_on_error=True)
|
||||||
with self.assertRaises(FailedPayloadsError):
|
with self.assertRaises(FailedPayloadsError):
|
||||||
producer.send_messages('foobar', b'test message')
|
producer.send_messages('foobar', b'test message')
|
||||||
|
|
||||||
def test_cleanup_is_not_called_on_stopped_producer(self):
|
def test_cleanup_is_not_called_on_stopped_producer(self):
|
||||||
producer = Producer(MagicMock(), async=True)
|
producer = Producer(MagicMock(), async=True)
|
||||||
|
@@ -133,6 +133,12 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
|||||||
# SimpleProducer Tests #
|
# SimpleProducer Tests #
|
||||||
############################
|
############################
|
||||||
|
|
||||||
|
def test_simple_producer_new_topic(self):
|
||||||
|
producer = SimpleProducer(self.client)
|
||||||
|
resp = producer.send_messages('new_topic', self.msg('foobar'))
|
||||||
|
self.assert_produce_response(resp, 0)
|
||||||
|
producer.stop()
|
||||||
|
|
||||||
def test_simple_producer(self):
|
def test_simple_producer(self):
|
||||||
partitions = self.client.get_partition_ids_for_topic(self.topic)
|
partitions = self.client.get_partition_ids_for_topic(self.topic)
|
||||||
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
|
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
|
||||||
@@ -157,15 +163,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
|||||||
|
|
||||||
producer.stop()
|
producer.stop()
|
||||||
|
|
||||||
def test_produce__new_topic_fails_with_reasonable_error(self):
|
|
||||||
new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4())).encode('utf-8')
|
|
||||||
producer = SimpleProducer(self.client, random_start=False)
|
|
||||||
|
|
||||||
# At first it doesn't exist
|
|
||||||
with self.assertRaises((UnknownTopicOrPartitionError,
|
|
||||||
LeaderNotAvailableError)):
|
|
||||||
producer.send_messages(new_topic, self.msg("one"))
|
|
||||||
|
|
||||||
def test_producer_random_order(self):
|
def test_producer_random_order(self):
|
||||||
producer = SimpleProducer(self.client, random_start=True)
|
producer = SimpleProducer(self.client, random_start=True)
|
||||||
resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
|
resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
|
||||||
|
Reference in New Issue
Block a user