Merge pull request #111 from rdiomar/multitopic_producers

Make producers take a topic argument at send rather than init time -- fixes Issue #110, but breaks backwards compatibility with previous Producer interface.
This commit is contained in:
Dana Powers
2014-01-30 17:00:55 -08:00
3 changed files with 98 additions and 89 deletions

View File

@@ -32,24 +32,24 @@ from kafka.producer import SimpleProducer, KeyedProducer
kafka = KafkaClient("localhost", 9092) kafka = KafkaClient("localhost", 9092)
# To send messages synchronously # To send messages synchronously
producer = SimpleProducer(kafka, "my-topic") producer = SimpleProducer(kafka)
producer.send_messages("some message") producer.send_messages("my-topic", "some message")
producer.send_messages("this method", "is variadic") producer.send_messages("my-topic", "this method", "is variadic")
# To send messages asynchronously # To send messages asynchronously
producer = SimpleProducer(kafka, "my-topic", async=True) producer = SimpleProducer(kafka, async=True)
producer.send_messages("async message") producer.send_messages("my-topic", "async message")
# To wait for acknowledgements # To wait for acknowledgements
# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
# a local log before sending response # a local log before sending response
# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
# by all in sync replicas before sending a response # by all in sync replicas before sending a response
producer = SimpleProducer(kafka, "my-topic", async=False, producer = SimpleProducer(kafka, async=False,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=2000) ack_timeout=2000)
response = producer.send_messages("async message") response = producer.send_messages("my-topic", "async message")
if response: if response:
print(response[0].error) print(response[0].error)
@@ -62,7 +62,7 @@ if response:
# Notes: # Notes:
# * If the producer dies before the messages are sent, there will be losses # * If the producer dies before the messages are sent, there will be losses
# * Call producer.stop() to send the messages and cleanup # * Call producer.stop() to send the messages and cleanup
producer = SimpleProducer(kafka, "my-topic", batch_send=True, producer = SimpleProducer(kafka, batch_send=True,
batch_send_every_n=20, batch_send_every_n=20,
batch_send_every_t=60) batch_send_every_t=60)
@@ -83,11 +83,11 @@ from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
kafka = KafkaClient("localhost", 9092) kafka = KafkaClient("localhost", 9092)
# HashedPartitioner is default # HashedPartitioner is default
producer = KeyedProducer(kafka, "my-topic") producer = KeyedProducer(kafka)
producer.send("key1", "some message") producer.send("my-topic", "key1", "some message")
producer.send("key2", "this methode") producer.send("my-topic", "key2", "this methode")
producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner) producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
``` ```
## Multiprocess consumer ## Multiprocess consumer

View File

@@ -8,7 +8,7 @@ from collections import defaultdict
from itertools import cycle from itertools import cycle
from multiprocessing import Queue, Process from multiprocessing import Queue, Process
from kafka.common import ProduceRequest from kafka.common import ProduceRequest, TopicAndPartition
from kafka.partitioner import HashedPartitioner from kafka.partitioner import HashedPartitioner
from kafka.protocol import create_message from kafka.protocol import create_message
@@ -20,7 +20,7 @@ BATCH_SEND_MSG_COUNT = 20
STOP_ASYNC_PRODUCER = -1 STOP_ASYNC_PRODUCER = -1
def _send_upstream(topic, queue, client, batch_time, batch_size, def _send_upstream(queue, client, batch_time, batch_size,
req_acks, ack_timeout): req_acks, ack_timeout):
""" """
Listen on the queue for a specified number of messages or till Listen on the queue for a specified number of messages or till
@@ -44,24 +44,27 @@ def _send_upstream(topic, queue, client, batch_time, batch_size,
# timeout is reached # timeout is reached
while count > 0 and timeout >= 0: while count > 0 and timeout >= 0:
try: try:
partition, msg = queue.get(timeout=timeout) topic_partition, msg = queue.get(timeout=timeout)
except Empty: except Empty:
break break
# Check if the controller has requested us to stop # Check if the controller has requested us to stop
if partition == STOP_ASYNC_PRODUCER: if topic_partition == STOP_ASYNC_PRODUCER:
stop = True stop = True
break break
# Adjust the timeout to match the remaining period # Adjust the timeout to match the remaining period
count -= 1 count -= 1
timeout = send_at - time.time() timeout = send_at - time.time()
msgset[partition].append(msg) msgset[topic_partition].append(msg)
# Send collected requests upstream # Send collected requests upstream
reqs = [] reqs = []
for partition, messages in msgset.items(): for topic_partition, messages in msgset.items():
req = ProduceRequest(topic, partition, messages) req = ProduceRequest(topic_partition.topic,
topic_partition.partition,
messages)
reqs.append(req) reqs.append(req)
try: try:
@@ -78,7 +81,6 @@ class Producer(object):
Params: Params:
client - The Kafka client instance to use client - The Kafka client instance to use
topic - The topic for sending messages to
async - If set to true, the messages are sent asynchronously via another async - If set to true, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these thread (process). We will not wait for a response to these
req_acks - A value indicating the acknowledgements that the server must req_acks - A value indicating the acknowledgements that the server must
@@ -119,8 +121,7 @@ class Producer(object):
if self.async: if self.async:
self.queue = Queue() # Messages are sent through this queue self.queue = Queue() # Messages are sent through this queue
self.proc = Process(target=_send_upstream, self.proc = Process(target=_send_upstream,
args=(self.topic, args=(self.queue,
self.queue,
self.client.copy(), self.client.copy(),
batch_send_every_t, batch_send_every_t,
batch_send_every_n, batch_send_every_n,
@@ -131,17 +132,18 @@ class Producer(object):
self.proc.daemon = True self.proc.daemon = True
self.proc.start() self.proc.start()
def send_messages(self, partition, *msg): def send_messages(self, topic, partition, *msg):
""" """
Helper method to send produce requests Helper method to send produce requests
""" """
if self.async: if self.async:
for m in msg: for m in msg:
self.queue.put((partition, create_message(m))) self.queue.put((TopicAndPartition(topic, partition),
create_message(m)))
resp = [] resp = []
else: else:
messages = [create_message(m) for m in msg] messages = [create_message(m) for m in msg]
req = ProduceRequest(self.topic, partition, messages) req = ProduceRequest(topic, partition, messages)
try: try:
resp = self.client.send_produce_request([req], acks=self.req_acks, resp = self.client.send_produce_request([req], acks=self.req_acks,
timeout=self.ack_timeout) timeout=self.ack_timeout)
@@ -169,7 +171,6 @@ class SimpleProducer(Producer):
Params: Params:
client - The Kafka client instance to use client - The Kafka client instance to use
topic - The topic for sending messages to
async - If True, the messages are sent asynchronously via another async - If True, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these thread (process). We will not wait for a response to these
req_acks - A value indicating the acknowledgements that the server must req_acks - A value indicating the acknowledgements that the server must
@@ -180,27 +181,31 @@ class SimpleProducer(Producer):
batch_send_every_n - If set, messages are send in batches of this size batch_send_every_n - If set, messages are send in batches of this size
batch_send_every_t - If set, messages are send after this timeout batch_send_every_t - If set, messages are send after this timeout
""" """
def __init__(self, client, topic, async=False, def __init__(self, client, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE, req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
batch_send=False, batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
self.topic = topic self.partition_cycles = {}
client.load_metadata_for_topics(topic)
self.next_partition = cycle(client.topic_partitions[topic])
super(SimpleProducer, self).__init__(client, async, req_acks, super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, batch_send, ack_timeout, batch_send,
batch_send_every_n, batch_send_every_n,
batch_send_every_t) batch_send_every_t)
def send_messages(self, *msg): def _next_partition(self, topic):
partition = self.next_partition.next() if topic not in self.partition_cycles:
return super(SimpleProducer, self).send_messages(partition, *msg) if topic not in self.client.topic_partitions:
self.client.load_metadata_for_topics(topic)
self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
return self.partition_cycles[topic].next()
def send_messages(self, topic, *msg):
partition = self._next_partition(topic)
return super(SimpleProducer, self).send_messages(topic, partition, *msg)
def __repr__(self): def __repr__(self):
return '<SimpleProducer topic=%s, batch=%s>' % (self.topic, self.async) return '<SimpleProducer batch=%s>' % self.async
class KeyedProducer(Producer): class KeyedProducer(Producer):
@@ -209,7 +214,6 @@ class KeyedProducer(Producer):
Args: Args:
client - The kafka client instance client - The kafka client instance
topic - The kafka topic to send messages to
partitioner - A partitioner class that will be used to get the partition partitioner - A partitioner class that will be used to get the partition
to send the message to. Must be derived from Partitioner to send the message to. Must be derived from Partitioner
async - If True, the messages are sent asynchronously via another async - If True, the messages are sent asynchronously via another
@@ -220,29 +224,34 @@ class KeyedProducer(Producer):
batch_send_every_n - If set, messages are send in batches of this size batch_send_every_n - If set, messages are send in batches of this size
batch_send_every_t - If set, messages are send after this timeout batch_send_every_t - If set, messages are send after this timeout
""" """
def __init__(self, client, topic, partitioner=None, async=False, def __init__(self, client, partitioner=None, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE, req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
batch_send=False, batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
self.topic = topic
client.load_metadata_for_topics(topic)
if not partitioner: if not partitioner:
partitioner = HashedPartitioner partitioner = HashedPartitioner
self.partitioner_class = partitioner
self.partitioner = partitioner(client.topic_partitions[topic]) self.partitioners = {}
super(KeyedProducer, self).__init__(client, async, req_acks, super(KeyedProducer, self).__init__(client, async, req_acks,
ack_timeout, batch_send, ack_timeout, batch_send,
batch_send_every_n, batch_send_every_n,
batch_send_every_t) batch_send_every_t)
def send(self, key, msg): def _next_partition(self, topic, key):
partitions = self.client.topic_partitions[self.topic] if topic not in self.partitioners:
partition = self.partitioner.partition(key, partitions) if topic not in self.client.topic_partitions:
return self.send_messages(partition, msg) self.client.load_metadata_for_topics(topic)
self.partitioners[topic] = \
self.partitioner_class(self.client.topic_partitions[topic])
partitioner = self.partitioners[topic]
return partitioner.partition(key, self.client.topic_partitions[topic])
def send(self, topic, key, msg):
partition = self._next_partition(topic, key)
return self.send_messages(topic, partition, msg)
def __repr__(self): def __repr__(self):
return '<KeyedProducer topic=%s, batch=%s>' % (self.topic, self.async) return '<KeyedProducer batch=%s>' % self.async

View File

@@ -284,8 +284,8 @@ class TestKafkaClient(KafkaTestCase):
# Producer Tests # Producer Tests
def test_simple_producer(self): def test_simple_producer(self):
producer = SimpleProducer(self.client, self.topic) producer = SimpleProducer(self.client)
resp = producer.send_messages("one", "two") resp = producer.send_messages(self.topic, "one", "two")
# Will go to partition 0 # Will go to partition 0
self.assertEquals(len(resp), 1) self.assertEquals(len(resp), 1)
@@ -293,7 +293,7 @@ class TestKafkaClient(KafkaTestCase):
self.assertEquals(resp[0].offset, 0) # offset of first msg self.assertEquals(resp[0].offset, 0) # offset of first msg
# Will go to partition 1 # Will go to partition 1
resp = producer.send_messages("three") resp = producer.send_messages(self.topic, "three")
self.assertEquals(len(resp), 1) self.assertEquals(len(resp), 1)
self.assertEquals(resp[0].error, 0) self.assertEquals(resp[0].error, 0)
self.assertEquals(resp[0].offset, 0) # offset of first msg self.assertEquals(resp[0].offset, 0) # offset of first msg
@@ -315,7 +315,7 @@ class TestKafkaClient(KafkaTestCase):
self.assertEquals(messages[0].message.value, "three") self.assertEquals(messages[0].message.value, "three")
# Will go to partition 0 # Will go to partition 0
resp = producer.send_messages("four", "five") resp = producer.send_messages(self.topic, "four", "five")
self.assertEquals(len(resp), 1) self.assertEquals(len(resp), 1)
self.assertEquals(resp[0].error, 0) self.assertEquals(resp[0].error, 0)
self.assertEquals(resp[0].offset, 2) # offset of first msg self.assertEquals(resp[0].offset, 2) # offset of first msg
@@ -323,12 +323,12 @@ class TestKafkaClient(KafkaTestCase):
producer.stop() producer.stop()
def test_round_robin_partitioner(self): def test_round_robin_partitioner(self):
producer = KeyedProducer(self.client, self.topic, producer = KeyedProducer(self.client,
partitioner=RoundRobinPartitioner) partitioner=RoundRobinPartitioner)
producer.send("key1", "one") producer.send(self.topic, "key1", "one")
producer.send("key2", "two") producer.send(self.topic, "key2", "two")
producer.send("key3", "three") producer.send(self.topic, "key3", "three")
producer.send("key4", "four") producer.send(self.topic, "key4", "four")
fetch1 = FetchRequest(self.topic, 0, 0, 1024) fetch1 = FetchRequest(self.topic, 0, 0, 1024)
fetch2 = FetchRequest(self.topic, 1, 0, 1024) fetch2 = FetchRequest(self.topic, 1, 0, 1024)
@@ -357,12 +357,12 @@ class TestKafkaClient(KafkaTestCase):
producer.stop() producer.stop()
def test_hashed_partitioner(self): def test_hashed_partitioner(self):
producer = KeyedProducer(self.client, self.topic, producer = KeyedProducer(self.client,
partitioner=HashedPartitioner) partitioner=HashedPartitioner)
producer.send(1, "one") producer.send(self.topic, 1, "one")
producer.send(2, "two") producer.send(self.topic, 2, "two")
producer.send(3, "three") producer.send(self.topic, 3, "three")
producer.send(4, "four") producer.send(self.topic, 4, "four")
fetch1 = FetchRequest(self.topic, 0, 0, 1024) fetch1 = FetchRequest(self.topic, 0, 0, 1024)
fetch2 = FetchRequest(self.topic, 1, 0, 1024) fetch2 = FetchRequest(self.topic, 1, 0, 1024)
@@ -391,9 +391,9 @@ class TestKafkaClient(KafkaTestCase):
producer.stop() producer.stop()
def test_acks_none(self): def test_acks_none(self):
producer = SimpleProducer(self.client, self.topic, producer = SimpleProducer(self.client,
req_acks=SimpleProducer.ACK_NOT_REQUIRED) req_acks=SimpleProducer.ACK_NOT_REQUIRED)
resp = producer.send_messages("one") resp = producer.send_messages(self.topic, "one")
self.assertEquals(len(resp), 0) self.assertEquals(len(resp), 0)
fetch = FetchRequest(self.topic, 0, 0, 1024) fetch = FetchRequest(self.topic, 0, 0, 1024)
@@ -410,9 +410,9 @@ class TestKafkaClient(KafkaTestCase):
producer.stop() producer.stop()
def test_acks_local_write(self): def test_acks_local_write(self):
producer = SimpleProducer(self.client, self.topic, producer = SimpleProducer(self.client,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE) req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE)
resp = producer.send_messages("one") resp = producer.send_messages(self.topic, "one")
self.assertEquals(len(resp), 1) self.assertEquals(len(resp), 1)
fetch = FetchRequest(self.topic, 0, 0, 1024) fetch = FetchRequest(self.topic, 0, 0, 1024)
@@ -430,9 +430,9 @@ class TestKafkaClient(KafkaTestCase):
def test_acks_cluster_commit(self): def test_acks_cluster_commit(self):
producer = SimpleProducer( producer = SimpleProducer(
self.client, self.topic, self.client,
req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT) req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT)
resp = producer.send_messages("one") resp = producer.send_messages(self.topic, "one")
self.assertEquals(len(resp), 1) self.assertEquals(len(resp), 1)
fetch = FetchRequest(self.topic, 0, 0, 1024) fetch = FetchRequest(self.topic, 0, 0, 1024)
@@ -449,8 +449,8 @@ class TestKafkaClient(KafkaTestCase):
producer.stop() producer.stop()
def test_async_simple_producer(self): def test_async_simple_producer(self):
producer = SimpleProducer(self.client, self.topic, async=True) producer = SimpleProducer(self.client, async=True)
resp = producer.send_messages("one") resp = producer.send_messages(self.topic, "one")
self.assertEquals(len(resp), 0) self.assertEquals(len(resp), 0)
# Give it some time # Give it some time
@@ -470,9 +470,9 @@ class TestKafkaClient(KafkaTestCase):
producer.stop() producer.stop()
def test_async_keyed_producer(self): def test_async_keyed_producer(self):
producer = KeyedProducer(self.client, self.topic, async=True) producer = KeyedProducer(self.client, async=True)
resp = producer.send("key1", "one") resp = producer.send(self.topic, "key1", "one")
self.assertEquals(len(resp), 0) self.assertEquals(len(resp), 0)
# Give it some time # Give it some time
@@ -492,14 +492,14 @@ class TestKafkaClient(KafkaTestCase):
producer.stop() producer.stop()
def test_batched_simple_producer(self): def test_batched_simple_producer(self):
producer = SimpleProducer(self.client, self.topic, producer = SimpleProducer(self.client,
batch_send=True, batch_send=True,
batch_send_every_n=10, batch_send_every_n=10,
batch_send_every_t=20) batch_send_every_t=20)
# Send 5 messages and do a fetch # Send 5 messages and do a fetch
msgs = ["message-%d" % i for i in range(0, 5)] msgs = ["message-%d" % i for i in range(0, 5)]
resp = producer.send_messages(*msgs) resp = producer.send_messages(self.topic, *msgs)
# Batch mode is async. No ack # Batch mode is async. No ack
self.assertEquals(len(resp), 0) self.assertEquals(len(resp), 0)
@@ -522,7 +522,7 @@ class TestKafkaClient(KafkaTestCase):
# Send 5 more messages, wait for 2 seconds and do a fetch # Send 5 more messages, wait for 2 seconds and do a fetch
msgs = ["message-%d" % i for i in range(5, 10)] msgs = ["message-%d" % i for i in range(5, 10)]
resp = producer.send_messages(*msgs) resp = producer.send_messages(self.topic, *msgs)
# Give it some time # Give it some time
time.sleep(2) time.sleep(2)
@@ -542,9 +542,9 @@ class TestKafkaClient(KafkaTestCase):
# Send 7 messages and wait for 20 seconds # Send 7 messages and wait for 20 seconds
msgs = ["message-%d" % i for i in range(10, 15)] msgs = ["message-%d" % i for i in range(10, 15)]
resp = producer.send_messages(*msgs) resp = producer.send_messages(self.topic, *msgs)
msgs = ["message-%d" % i for i in range(15, 17)] msgs = ["message-%d" % i for i in range(15, 17)]
resp = producer.send_messages(*msgs) resp = producer.send_messages(self.topic, *msgs)
fetch1 = FetchRequest(self.topic, 0, 5, 1024) fetch1 = FetchRequest(self.topic, 0, 5, 1024)
fetch2 = FetchRequest(self.topic, 1, 5, 1024) fetch2 = FetchRequest(self.topic, 1, 5, 1024)
@@ -846,25 +846,25 @@ class TestFailover(KafkaTestCase):
def test_switch_leader(self): def test_switch_leader(self):
key, topic, partition = random_string(5), self.topic, 0 key, topic, partition = random_string(5), self.topic, 0
producer = SimpleProducer(self.client, topic) producer = SimpleProducer(self.client)
for i in range(1, 4): for i in range(1, 4):
# XXX unfortunately, the conns dict needs to be warmed for this to work # XXX unfortunately, the conns dict needs to be warmed for this to work
# XXX unfortunately, for warming to work, we need at least as many partitions as brokers # XXX unfortunately, for warming to work, we need at least as many partitions as brokers
self._send_random_messages(producer, 10) self._send_random_messages(producer, self.topic, 10)
# kil leader for partition 0 # kil leader for partition 0
broker = self._kill_leader(topic, partition) broker = self._kill_leader(topic, partition)
# expect failure, reload meta data # expect failure, reload meta data
with self.assertRaises(FailedPayloadsError): with self.assertRaises(FailedPayloadsError):
producer.send_messages('part 1') producer.send_messages(self.topic, 'part 1')
producer.send_messages('part 2') producer.send_messages(self.topic, 'part 2')
time.sleep(1) time.sleep(1)
# send to new leader # send to new leader
self._send_random_messages(producer, 10) self._send_random_messages(producer, self.topic, 10)
broker.open() broker.open()
time.sleep(3) time.sleep(3)
@@ -877,22 +877,22 @@ class TestFailover(KafkaTestCase):
def test_switch_leader_async(self): def test_switch_leader_async(self):
key, topic, partition = random_string(5), self.topic, 0 key, topic, partition = random_string(5), self.topic, 0
producer = SimpleProducer(self.client, topic, async=True) producer = SimpleProducer(self.client, async=True)
for i in range(1, 4): for i in range(1, 4):
self._send_random_messages(producer, 10) self._send_random_messages(producer, self.topic, 10)
# kil leader for partition 0 # kil leader for partition 0
broker = self._kill_leader(topic, partition) broker = self._kill_leader(topic, partition)
# expect failure, reload meta data # expect failure, reload meta data
producer.send_messages('part 1') producer.send_messages(self.topic, 'part 1')
producer.send_messages('part 2') producer.send_messages(self.topic, 'part 2')
time.sleep(1) time.sleep(1)
# send to new leader # send to new leader
self._send_random_messages(producer, 10) self._send_random_messages(producer, self.topic, 10)
broker.open() broker.open()
time.sleep(3) time.sleep(3)
@@ -903,9 +903,9 @@ class TestFailover(KafkaTestCase):
producer.stop() producer.stop()
def _send_random_messages(self, producer, n): def _send_random_messages(self, producer, topic, n):
for j in range(n): for j in range(n):
resp = producer.send_messages(random_string(10)) resp = producer.send_messages(topic, random_string(10))
if len(resp) > 0: if len(resp) > 0:
self.assertEquals(resp[0].error, 0) self.assertEquals(resp[0].error, 0)
time.sleep(1) # give it some time time.sleep(1) # give it some time