SimpleConsumer flow changes:

* Combine partition fetch requests into a single request
* Put the messages received in a queue and update offsets
* Grab as many messages from the queue as requested
* When the queue is empty, request more
* timeout param for get_messages() is the actual timeout for getting those messages
* Based on https://github.com/mumrah/kafka-python/pull/74 -
  don't increase min_bytes if the consumer fetch buffer size is too small.

Notes:

Change MultiProcessConsumer and _mp_consume() accordingly.

Previously, when querying each partition separately, it was possible to
block waiting for messages on partition 0 even if there are new ones in partition 1.
These changes allow us to block while waiting for messages on all partitions,
and reduce total number of kafka requests.

Use Queue.Queue for single proc Queue instead of already imported
multiprocessing.Queue because the latter doesn't seem to guarantee immediate
availability of items after a put:

>>> from multiprocessing import Queue
>>> q = Queue()
>>> q.put(1); q.get_nowait()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 152, in get_nowait
    return self.get(False)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 134, in get
    raise Empty
Queue.Empty
This commit is contained in:
Omar Ghishan
2013-12-18 18:36:32 -08:00
parent 5dd8d81c9e
commit 0c7cf2569e

View File

@@ -3,8 +3,8 @@ from itertools import izip_longest, repeat
import logging
import time
from threading import Lock
from multiprocessing import Process, Queue, Event, Value
from Queue import Empty
from multiprocessing import Process, Queue as MPQueue, Event, Value
from Queue import Empty, Queue
from kafka.common import (
ErrorMapping, FetchRequest,
@@ -227,6 +227,7 @@ class SimpleConsumer(Consumer):
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
self.fetch_min_bytes = fetch_size_bytes
self.fetch_started = defaultdict(bool) # defaults to false
self.queue = Queue(buffer_size)
super(SimpleConsumer, self).__init__(
client, group, topic,
@@ -292,122 +293,75 @@ class SimpleConsumer(Consumer):
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till some messages are fetched.
timeout: If None, and block=True, the API will block infinitely.
If >0, API will block for specified time (in seconds)
timeout: If block is True, the function will block for the specified time (in seconds)
until count messages is fetched. If None, it will block forever.
"""
messages = []
iterator = self.__iter__()
# HACK: This splits the timeout between available partitions
if timeout:
timeout = timeout * 1.0 / len(self.offsets)
max_time = time.time() + timeout
with FetchContext(self, block, timeout):
while count > 0:
try:
messages.append(next(iterator))
except StopIteration:
break
while count > 0 and (timeout is None or timeout > 0):
message = self.get_message(block, timeout)
if message:
messages.append(message)
count -= 1
else:
# Ran out of messages for the last request. If we're not blocking, break.
if not block:
break
if timeout:
timeout = max_time - time.time()
return messages
def get_message(self, block=True, timeout=0.1):
if self.queue.empty():
with FetchContext(self, block, timeout):
self._fetch()
try:
return self.queue.get_nowait()
except Empty:
return None
def __iter__(self):
"""
Create an iterate per partition. Iterate through them calling next()
until they are all exhausted.
"""
iters = {}
for partition, offset in self.offsets.items():
iters[partition] = self.__iter_partition__(partition, offset)
if len(iters) == 0:
return
while True:
if len(iters) == 0:
break
message = self.get_message(True, 100)
if message:
yield message
else:
# In case we did not receive any message, give up the CPU for
# a while before we try again
time.sleep(0.1)
for partition, it in iters.items():
try:
if self.partition_info:
yield (partition, it.next())
else:
yield it.next()
except StopIteration:
log.debug("Done iterating over partition %s" % partition)
del iters[partition]
# skip auto-commit since we didn't yield anything
continue
# Count, check and commit messages if necessary
self.count_since_commit += 1
self._auto_commit()
def __iter_partition__(self, partition, offset):
"""
Iterate over the messages in a partition. Create a FetchRequest
to get back a batch of messages, yield them one at a time.
After a batch is exhausted, start a new batch unless we've reached
the end of this partition.
"""
# The offset that is stored in the consumer is the offset that
# we have consumed. In subsequent iterations, we are supposed to
# fetch the next message (that is from the next offset)
# However, for the 0th message, the offset should be as-is.
# An OffsetFetchRequest to Kafka gives 0 for a new queue. This is
# problematic, since 0 is offset of a message which we have not yet
# consumed.
if self.fetch_started[partition]:
offset += 1
fetch_size = self.fetch_min_bytes
while True:
# use MaxBytes = client's bufsize since we're only
# fetching one topic + partition
req = FetchRequest(
self.topic, partition, offset, self.buffer_size)
(resp,) = self.client.send_fetch_request(
[req],
max_wait_time=self.fetch_max_wait_time,
min_bytes=fetch_size)
assert resp.topic == self.topic
assert resp.partition == partition
next_offset = None
def _fetch(self):
requests = []
partitions = self.offsets.keys()
for partition in partitions:
requests.append(FetchRequest(self.topic, partition, self.offsets[partition], self.buffer_size))
responses = self.client.send_fetch_request(
requests,
max_wait_time=int(self.fetch_max_wait_time),
min_bytes=self.fetch_min_bytes)
for resp in responses:
partition = resp.partition
try:
for message in resp.messages:
next_offset = message.offset
# update the offset before the message is yielded. This
# is so that the consumer state is not lost in certain
# cases.
#
# For eg: the message is yielded and consumed by the
# caller, but the caller does not come back into the
# generator again. The message will be consumed but the
# status will not be updated in the consumer
self.fetch_started[partition] = True
self.offsets[partition] = message.offset
yield message
self.offsets[partition] = message.offset + 1
# Count, check and commit messages if necessary
self.count_since_commit += 1
self._auto_commit()
if self.partition_info:
self.queue.put((partition, message))
else:
self.queue.put(message)
except ConsumerFetchSizeTooSmall, e:
fetch_size *= 1.5
log.warn(
"Fetch size too small, increasing to %d (1.5x) and retry",
fetch_size)
continue
self.buffer_size *= 2
log.warn("Fetch size too small, increasing to %d (2x) and retry", self.buffer_size)
except ConsumerNoMoreData, e:
log.debug("Iteration was ended by %r", e)
if next_offset is None:
break
else:
offset = next_offset + 1
except StopIteration:
# Stop iterating through this partition
log.debug("Done iterating over partition %s" % partition)
def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
@@ -446,8 +400,9 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# indicates a specific number of messages, follow that advice
count = 0
for partition, message in consumer:
queue.put((partition, message))
message = consumer.get_message()
if message:
queue.put(message)
count += 1
# We have reached the required size. The controller might have
@@ -457,11 +412,10 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# can reset the 'start' event
if count == size.value:
pause.wait()
break
# In case we did not receive any message, give up the CPU for
# a while before we try again
if count == 0:
else:
# In case we did not receive any message, give up the CPU for
# a while before we try again
time.sleep(0.1)
consumer.stop()
@@ -507,7 +461,7 @@ class MultiProcessConsumer(Consumer):
# Variables for managing and controlling the data flow from
# consumer child process to master
self.queue = Queue(1024) # Child consumers dump messages into this
self.queue = MPQueue(1024) # Child consumers dump messages into this
self.start = Event() # Indicates the consumers to start fetch
self.exit = Event() # Requests the consumers to shutdown
self.pause = Event() # Requests the consumers to pause fetch
@@ -589,8 +543,8 @@ class MultiProcessConsumer(Consumer):
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till some messages are fetched.
timeout: If None, and block=True, the API will block infinitely.
If >0, API will block for specified time (in seconds)
timeout: If block is True, the function will block for the specified time (in seconds)
until count messages is fetched. If None, it will block forever.
"""
messages = []
@@ -601,7 +555,10 @@ class MultiProcessConsumer(Consumer):
self.size.value = count
self.pause.clear()
while count > 0:
if timeout:
max_time = time.time() + timeout
while count > 0 and (timeout is None or timeout > 0):
# Trigger consumption only if the queue is empty
# By doing this, we will ensure that consumers do not
# go into overdrive and keep consuming thousands of
@@ -621,6 +578,7 @@ class MultiProcessConsumer(Consumer):
self.count_since_commit += 1
self._auto_commit()
count -= 1
timeout = max_time - time.time()
self.size.value = 0
self.start.clear()