From 8b25625c7f3c79bc495f78c4d06fe54087a6fe90 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Wed, 26 Jun 2013 14:57:20 +0530 Subject: [PATCH] Got batched mode to work properly --- kafka/producer.py | 107 +++++++++++++++++++++------------------------- 1 file changed, 48 insertions(+), 59 deletions(-) diff --git a/kafka/producer.py b/kafka/producer.py index be80c62..5eb1bd8 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -1,4 +1,5 @@ -from datetime import datetime +from collections import defaultdict +from datetime import datetime, timedelta from itertools import cycle from multiprocessing import Queue, Process from Queue import Empty @@ -52,9 +53,9 @@ class Producer(object): async = True assert batch_send_every_n > 0 assert batch_send_every_t > 0 - handler = self._send_upstream_batched else: - handler = self._send_upstream + batch_send_every_n = 1 + batch_send_every_t = 3600 self.client = client self.async = async @@ -66,38 +67,11 @@ class Producer(object): if self.async: self.queue = Queue() # Messages are sent through this queue - self.proc = Process(target=handler, args=(self.queue,)) + self.proc = Process(target=self._send_upstream, args=(self.queue,)) self.proc.daemon = True # Process will die if main thread exits self.proc.start() - def _send_async_requests(self, reqs): - """ - Send a bunch of requests upstream and log any errors - """ - if not reqs: - return - - # Ignore any acks in the async mode and just log exceptions - try: - self.client.send_produce_request(reqs, acks=self.req_acks, - timeout=self.ack_timeout) - except Exception as exp: - log.error("Error sending message", exc_info=sys.exc_info()) - def _send_upstream(self, queue): - """ - Listen on the queue for messages and send them upstream to the brokers - """ - while True: - req = queue.get() - - if req == STOP_ASYNC_PRODUCER: - log.info("Stopping async producer") - break - - self._send_async_requests([req]) - - def _send_upstream_batches(self, queue): """ Listen on the queue for a specified number of messages or till a specified timeout and send them upstream to the brokers in one @@ -107,38 +81,48 @@ class Producer(object): while not stop: timeout = self.batch_time - stop = datetime.now() + timedelta(seconds=timeout) + send_at = datetime.now() + timedelta(seconds=timeout) count = self.batch_size - reqs = [] + msgset = defaultdict(list) # Keep fetching till we gather enough messages or a # timeout is reached while count > 0 and timeout >= 0: try: - req = queue.get(timeout) - - if req == STOP_ASYNC_PRODUCER: - stop = True - break - reqs.append(req) + partition, msg = queue.get(timeout=timeout) except Empty: break + # Check if the controller has requested us to stop + if partition == STOP_ASYNC_PRODUCER: + stop = True + break + # Adjust the timeout to match the remaining period count -= 1 - timeout = (stop - datetime.now()).total_seconds() + timeout = (send_at - datetime.now()).total_seconds() + msgset[partition].append(msg) # Send collected requests upstream - self._send_async_requests(reqs) + for partition, messages in msgset.items(): + try: + req = ProduceRequest(self.topic, partition, messages) + self.client.send_produce_request([req], acks=self.req_acks, + timeout=self.ack_timeout) + except Exception as exp: + log.error("Error sending message", exc_info=sys.exc_info()) - def send_request(self, req): + def send_messages(self, partition, *msg): """ Helper method to send produce requests """ - resp = [] if self.async: - self.queue.put(req) + for m in msg: + self.queue.put((partition, create_message(m))) + resp = [] else: + messages = [create_message(m) for m in msg] + req = ProduceRequest(self.topic, partition, messages) resp = self.client.send_produce_request([req], acks=self.req_acks, timeout=self.ack_timeout) return resp @@ -149,7 +133,7 @@ class Producer(object): forcefully cleaning up. """ if self.async: - self.queue.put(STOP_ASYNC_PRODUCER) + self.queue.put((STOP_ASYNC_PRODUCER, None)) self.proc.join(timeout) if self.proc.is_alive(): @@ -175,18 +159,22 @@ class SimpleProducer(Producer): """ def __init__(self, client, topic, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, - ack_timeout=Producer.DEFAULT_ACK_TIMEOUT): + ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, + batch_send=False, + batch_send_every_n=BATCH_SEND_MSG_COUNT, + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): self.topic = topic client._load_metadata_for_topics(topic) self.next_partition = cycle(client.topic_partitions[topic]) - super(SimpleProducer, self).__init__(client, async, - req_acks, ack_timeout) + super(SimpleProducer, self).__init__(client, async, req_acks, + ack_timeout, batch_send, + batch_send_every_n, + batch_send_every_t) def send_messages(self, *msg): - req = ProduceRequest(self.topic, self.next_partition.next(), - messages=[create_message(m) for m in msg]) - return self.send_request(req) + partition = self.next_partition.next() + super(SimpleProducer, self).send_messages(partition, *msg) class KeyedProducer(Producer): @@ -208,7 +196,10 @@ class KeyedProducer(Producer): """ def __init__(self, client, topic, partitioner=None, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, - ack_timeout=Producer.DEFAULT_ACK_TIMEOUT): + ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, + batch_send=False, + batch_send_every_n=BATCH_SEND_MSG_COUNT, + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): self.topic = topic client._load_metadata_for_topics(topic) @@ -217,14 +208,12 @@ class KeyedProducer(Producer): self.partitioner = partitioner(client.topic_partitions[topic]) - super(KeyedProducer, self).__init__(client, async, - req_acks, ack_timeout) + super(KeyedProducer, self).__init__(client, async, req_acks, + ack_timeout, batch_send, + batch_send_every_n, + batch_send_every_t) def send(self, key, msg): partitions = self.client.topic_partitions[self.topic] partition = self.partitioner.partition(key, partitions) - - req = ProduceRequest(self.topic, partition, - messages=[create_message(msg)]) - - return self.send_request(req) + return self.send_messages(partition, msg)