Got batched mode to work properly

This commit is contained in:
Mahendra M
2013-06-26 14:57:20 +05:30
parent c6626fe87e
commit 8b25625c7f

View File

@@ -1,4 +1,5 @@
from datetime import datetime
from collections import defaultdict
from datetime import datetime, timedelta
from itertools import cycle
from multiprocessing import Queue, Process
from Queue import Empty
@@ -52,9 +53,9 @@ class Producer(object):
async = True
assert batch_send_every_n > 0
assert batch_send_every_t > 0
handler = self._send_upstream_batched
else:
handler = self._send_upstream
batch_send_every_n = 1
batch_send_every_t = 3600
self.client = client
self.async = async
@@ -66,38 +67,11 @@ class Producer(object):
if self.async:
self.queue = Queue() # Messages are sent through this queue
self.proc = Process(target=handler, args=(self.queue,))
self.proc = Process(target=self._send_upstream, args=(self.queue,))
self.proc.daemon = True # Process will die if main thread exits
self.proc.start()
def _send_async_requests(self, reqs):
"""
Send a bunch of requests upstream and log any errors
"""
if not reqs:
return
# Ignore any acks in the async mode and just log exceptions
try:
self.client.send_produce_request(reqs, acks=self.req_acks,
timeout=self.ack_timeout)
except Exception as exp:
log.error("Error sending message", exc_info=sys.exc_info())
def _send_upstream(self, queue):
"""
Listen on the queue for messages and send them upstream to the brokers
"""
while True:
req = queue.get()
if req == STOP_ASYNC_PRODUCER:
log.info("Stopping async producer")
break
self._send_async_requests([req])
def _send_upstream_batches(self, queue):
"""
Listen on the queue for a specified number of messages or till
a specified timeout and send them upstream to the brokers in one
@@ -107,38 +81,48 @@ class Producer(object):
while not stop:
timeout = self.batch_time
stop = datetime.now() + timedelta(seconds=timeout)
send_at = datetime.now() + timedelta(seconds=timeout)
count = self.batch_size
reqs = []
msgset = defaultdict(list)
# Keep fetching till we gather enough messages or a
# timeout is reached
while count > 0 and timeout >= 0:
try:
req = queue.get(timeout)
if req == STOP_ASYNC_PRODUCER:
stop = True
break
reqs.append(req)
partition, msg = queue.get(timeout=timeout)
except Empty:
break
# Check if the controller has requested us to stop
if partition == STOP_ASYNC_PRODUCER:
stop = True
break
# Adjust the timeout to match the remaining period
count -= 1
timeout = (stop - datetime.now()).total_seconds()
timeout = (send_at - datetime.now()).total_seconds()
msgset[partition].append(msg)
# Send collected requests upstream
self._send_async_requests(reqs)
for partition, messages in msgset.items():
try:
req = ProduceRequest(self.topic, partition, messages)
self.client.send_produce_request([req], acks=self.req_acks,
timeout=self.ack_timeout)
except Exception as exp:
log.error("Error sending message", exc_info=sys.exc_info())
def send_request(self, req):
def send_messages(self, partition, *msg):
"""
Helper method to send produce requests
"""
resp = []
if self.async:
self.queue.put(req)
for m in msg:
self.queue.put((partition, create_message(m)))
resp = []
else:
messages = [create_message(m) for m in msg]
req = ProduceRequest(self.topic, partition, messages)
resp = self.client.send_produce_request([req], acks=self.req_acks,
timeout=self.ack_timeout)
return resp
@@ -149,7 +133,7 @@ class Producer(object):
forcefully cleaning up.
"""
if self.async:
self.queue.put(STOP_ASYNC_PRODUCER)
self.queue.put((STOP_ASYNC_PRODUCER, None))
self.proc.join(timeout)
if self.proc.is_alive():
@@ -175,18 +159,22 @@ class SimpleProducer(Producer):
"""
def __init__(self, client, topic, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT):
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
self.topic = topic
client._load_metadata_for_topics(topic)
self.next_partition = cycle(client.topic_partitions[topic])
super(SimpleProducer, self).__init__(client, async,
req_acks, ack_timeout)
super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, batch_send,
batch_send_every_n,
batch_send_every_t)
def send_messages(self, *msg):
req = ProduceRequest(self.topic, self.next_partition.next(),
messages=[create_message(m) for m in msg])
return self.send_request(req)
partition = self.next_partition.next()
super(SimpleProducer, self).send_messages(partition, *msg)
class KeyedProducer(Producer):
@@ -208,7 +196,10 @@ class KeyedProducer(Producer):
"""
def __init__(self, client, topic, partitioner=None, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT):
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
self.topic = topic
client._load_metadata_for_topics(topic)
@@ -217,14 +208,12 @@ class KeyedProducer(Producer):
self.partitioner = partitioner(client.topic_partitions[topic])
super(KeyedProducer, self).__init__(client, async,
req_acks, ack_timeout)
super(KeyedProducer, self).__init__(client, async, req_acks,
ack_timeout, batch_send,
batch_send_every_n,
batch_send_every_t)
def send(self, key, msg):
partitions = self.client.topic_partitions[self.topic]
partition = self.partitioner.partition(key, partitions)
req = ProduceRequest(self.topic, partition,
messages=[create_message(msg)])
return self.send_request(req)
return self.send_messages(partition, msg)