Merge pull request #379 from dpkp/deprecate_keyed_producer_send
Deprecate KeyedProducer.send in favor of send_messages
This commit is contained in:
@@ -63,8 +63,8 @@ Keyed messages
|
|||||||
|
|
||||||
# HashedPartitioner is default
|
# HashedPartitioner is default
|
||||||
producer = KeyedProducer(kafka)
|
producer = KeyedProducer(kafka)
|
||||||
producer.send("my-topic", "key1", "some message")
|
producer.send_messages("my-topic", "key1", "some message")
|
||||||
producer.send("my-topic", "key2", "this methode")
|
producer.send_messages("my-topic", "key2", "this methode")
|
||||||
|
|
||||||
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
|
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
import warnings
|
||||||
|
|
||||||
from kafka.partitioner import HashedPartitioner
|
from kafka.partitioner import HashedPartitioner
|
||||||
from kafka.util import kafka_bytestring
|
from kafka.util import kafka_bytestring
|
||||||
@@ -74,10 +75,10 @@ class KeyedProducer(Producer):
|
|||||||
partition = self._next_partition(topic, key)
|
partition = self._next_partition(topic, key)
|
||||||
return self._send_messages(topic, partition, *msg, key=key)
|
return self._send_messages(topic, partition, *msg, key=key)
|
||||||
|
|
||||||
|
# DEPRECATED
|
||||||
def send(self, topic, key, msg):
|
def send(self, topic, key, msg):
|
||||||
topic = kafka_bytestring(topic)
|
warnings.warn("KeyedProducer.send is deprecated in favor of send_messages", DeprecationWarning)
|
||||||
partition = self._next_partition(topic, key)
|
return self.send_messages(topic, key, msg)
|
||||||
return self._send_messages(topic, partition, msg, key=key)
|
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return '<KeyedProducer batch=%s>' % self.async
|
return '<KeyedProducer batch=%s>' % self.async
|
||||||
|
|||||||
@@ -346,10 +346,10 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
|||||||
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
|
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
|
||||||
|
|
||||||
producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner)
|
producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner)
|
||||||
resp1 = producer.send(self.topic, self.key("key1"), self.msg("one"))
|
resp1 = producer.send_messages(self.topic, self.key("key1"), self.msg("one"))
|
||||||
resp2 = producer.send(self.topic, self.key("key2"), self.msg("two"))
|
resp2 = producer.send_messages(self.topic, self.key("key2"), self.msg("two"))
|
||||||
resp3 = producer.send(self.topic, self.key("key3"), self.msg("three"))
|
resp3 = producer.send_messages(self.topic, self.key("key3"), self.msg("three"))
|
||||||
resp4 = producer.send(self.topic, self.key("key4"), self.msg("four"))
|
resp4 = producer.send_messages(self.topic, self.key("key4"), self.msg("four"))
|
||||||
|
|
||||||
self.assert_produce_response(resp1, start_offsets[0]+0)
|
self.assert_produce_response(resp1, start_offsets[0]+0)
|
||||||
self.assert_produce_response(resp2, start_offsets[1]+0)
|
self.assert_produce_response(resp2, start_offsets[1]+0)
|
||||||
@@ -367,11 +367,11 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
|||||||
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
|
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
|
||||||
|
|
||||||
producer = KeyedProducer(self.client, partitioner=HashedPartitioner)
|
producer = KeyedProducer(self.client, partitioner=HashedPartitioner)
|
||||||
resp1 = producer.send(self.topic, self.key("1"), self.msg("one"))
|
resp1 = producer.send_messages(self.topic, self.key("1"), self.msg("one"))
|
||||||
resp2 = producer.send(self.topic, self.key("2"), self.msg("two"))
|
resp2 = producer.send_messages(self.topic, self.key("2"), self.msg("two"))
|
||||||
resp3 = producer.send(self.topic, self.key("3"), self.msg("three"))
|
resp3 = producer.send_messages(self.topic, self.key("3"), self.msg("three"))
|
||||||
resp4 = producer.send(self.topic, self.key("3"), self.msg("four"))
|
resp4 = producer.send_messages(self.topic, self.key("3"), self.msg("four"))
|
||||||
resp5 = producer.send(self.topic, self.key("4"), self.msg("five"))
|
resp5 = producer.send_messages(self.topic, self.key("4"), self.msg("five"))
|
||||||
|
|
||||||
offsets = {partitions[0]: start_offsets[0], partitions[1]: start_offsets[1]}
|
offsets = {partitions[0]: start_offsets[0], partitions[1]: start_offsets[1]}
|
||||||
messages = {partitions[0]: [], partitions[1]: []}
|
messages = {partitions[0]: [], partitions[1]: []}
|
||||||
@@ -400,7 +400,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
|||||||
|
|
||||||
producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True)
|
producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True)
|
||||||
|
|
||||||
resp = producer.send(self.topic, self.key("key1"), self.msg("one"))
|
resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one"))
|
||||||
self.assertEqual(len(resp), 0)
|
self.assertEqual(len(resp), 0)
|
||||||
|
|
||||||
# wait for the server to report a new highwatermark
|
# wait for the server to report a new highwatermark
|
||||||
|
|||||||
Reference in New Issue
Block a user