Add comments and maintain 80 character line limit
This commit is contained in:
@@ -302,8 +302,9 @@ class SimpleConsumer(Consumer):
|
||||
|
||||
count: Indicates the maximum number of messages to be fetched
|
||||
block: If True, the API will block till some messages are fetched.
|
||||
timeout: If block is True, the function will block for the specified time (in seconds)
|
||||
until count messages is fetched. If None, it will block forever.
|
||||
timeout: If block is True, the function will block for the specified
|
||||
time (in seconds) until count messages is fetched. If None,
|
||||
it will block forever.
|
||||
"""
|
||||
messages = []
|
||||
if timeout:
|
||||
@@ -315,16 +316,20 @@ class SimpleConsumer(Consumer):
|
||||
messages.append(message)
|
||||
count -= 1
|
||||
else:
|
||||
# Ran out of messages for the last request. If we're not blocking, break.
|
||||
# Ran out of messages for the last request.
|
||||
if not block:
|
||||
# If we're not blocking, break.
|
||||
break
|
||||
if timeout:
|
||||
# If we're blocking and have a timeout, reduce it to the
|
||||
# appropriate value
|
||||
timeout = max_time - time.time()
|
||||
|
||||
return messages
|
||||
|
||||
def get_message(self, block=True, timeout=0.1):
|
||||
if self.queue.empty():
|
||||
# We're out of messages, go grab some more.
|
||||
with FetchContext(self, block, timeout):
|
||||
self._fetch()
|
||||
try:
|
||||
@@ -351,29 +356,39 @@ class SimpleConsumer(Consumer):
|
||||
break
|
||||
|
||||
def _fetch(self):
|
||||
# Create fetch request payloads for all the partitions
|
||||
requests = []
|
||||
partitions = self.offsets.keys()
|
||||
for partition in partitions:
|
||||
requests.append(FetchRequest(self.topic, partition, self.offsets[partition], self.buffer_size))
|
||||
requests.append(FetchRequest(self.topic, partition,
|
||||
self.offsets[partition],
|
||||
self.buffer_size))
|
||||
# Send request
|
||||
responses = self.client.send_fetch_request(
|
||||
requests,
|
||||
max_wait_time=int(self.fetch_max_wait_time),
|
||||
min_bytes=self.fetch_min_bytes)
|
||||
|
||||
for resp in responses:
|
||||
partition = resp.partition
|
||||
try:
|
||||
for message in resp.messages:
|
||||
# Update partition offset
|
||||
self.offsets[partition] = message.offset + 1
|
||||
|
||||
# Count, check and commit messages if necessary
|
||||
self.count_since_commit += 1
|
||||
self._auto_commit()
|
||||
|
||||
# Put the message in our queue
|
||||
if self.partition_info:
|
||||
self.queue.put((partition, message))
|
||||
else:
|
||||
self.queue.put(message)
|
||||
except ConsumerFetchSizeTooSmall, e:
|
||||
self.buffer_size *= 2
|
||||
log.warn("Fetch size too small, increasing to %d (2x) and retry", self.buffer_size)
|
||||
log.warn("Fetch size too small, increase to %d (2x) and retry",
|
||||
self.buffer_size)
|
||||
except ConsumerNoMoreData, e:
|
||||
log.debug("Iteration was ended by %r", e)
|
||||
except StopIteration:
|
||||
@@ -560,8 +575,9 @@ class MultiProcessConsumer(Consumer):
|
||||
|
||||
count: Indicates the maximum number of messages to be fetched
|
||||
block: If True, the API will block till some messages are fetched.
|
||||
timeout: If block is True, the function will block for the specified time (in seconds)
|
||||
until count messages is fetched. If None, it will block forever.
|
||||
timeout: If block is True, the function will block for the specified
|
||||
time (in seconds) until count messages is fetched. If None,
|
||||
it will block forever.
|
||||
"""
|
||||
messages = []
|
||||
|
||||
|
||||
Reference in New Issue
Block a user