Resolve conflicts for #106

This commit is contained in:
Omar Ghishan
2014-01-28 14:12:49 -08:00

View File

@@ -1,6 +1,5 @@
from __future__ import absolute_import from __future__ import absolute_import
from collections import defaultdict
from itertools import izip_longest, repeat from itertools import izip_longest, repeat
import logging import logging
import time import time
@@ -235,6 +234,12 @@ class SimpleConsumer(Consumer):
buffer_size=FETCH_BUFFER_SIZE_BYTES, buffer_size=FETCH_BUFFER_SIZE_BYTES,
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
iter_timeout=None): iter_timeout=None):
super(SimpleConsumer, self).__init__(
client, group, topic,
partitions=partitions,
auto_commit=auto_commit,
auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)
if max_buffer_size is not None and buffer_size > max_buffer_size: if max_buffer_size is not None and buffer_size > max_buffer_size:
raise ValueError("buffer_size (%d) is greater than " raise ValueError("buffer_size (%d) is greater than "
@@ -245,17 +250,10 @@ class SimpleConsumer(Consumer):
self.partition_info = False # Do not return partition info in msgs self.partition_info = False # Do not return partition info in msgs
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
self.fetch_min_bytes = fetch_size_bytes self.fetch_min_bytes = fetch_size_bytes
self.fetch_started = defaultdict(bool) # defaults to false self.fetch_offsets = self.offsets.copy()
self.iter_timeout = iter_timeout self.iter_timeout = iter_timeout
self.queue = Queue() self.queue = Queue()
super(SimpleConsumer, self).__init__(
client, group, topic,
partitions=partitions,
auto_commit=auto_commit,
auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)
def __repr__(self): def __repr__(self):
return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \ return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
(self.group, self.topic, str(self.offsets.keys())) (self.group, self.topic, str(self.offsets.keys()))
@@ -305,6 +303,10 @@ class SimpleConsumer(Consumer):
else: else:
raise ValueError("Unexpected value for `whence`, %d" % whence) raise ValueError("Unexpected value for `whence`, %d" % whence)
# Reset queue and fetch offsets since they are invalid
self.fetch_offsets = self.offsets.copy()
self.queue = Queue()
def get_messages(self, count=1, block=True, timeout=0.1): def get_messages(self, count=1, block=True, timeout=0.1):
""" """
Fetch the specified number of messages Fetch the specified number of messages
@@ -316,33 +318,69 @@ class SimpleConsumer(Consumer):
it will block forever. it will block forever.
""" """
messages = [] messages = []
if timeout: if timeout is not None:
max_time = time.time() + timeout max_time = time.time() + timeout
new_offsets = {}
while count > 0 and (timeout is None or timeout > 0): while count > 0 and (timeout is None or timeout > 0):
message = self.get_message(block, timeout) result = self._get_message(block, timeout, get_partition_info=True,
if message: update_offset=False)
if result:
partition, message = result
if self.partition_info:
messages.append(result)
else:
messages.append(message) messages.append(message)
new_offsets[partition] = message.offset + 1
count -= 1 count -= 1
else: else:
# Ran out of messages for the last request. # Ran out of messages for the last request.
if not block: if not block:
# If we're not blocking, break. # If we're not blocking, break.
break break
if timeout: if timeout is not None:
# If we're blocking and have a timeout, reduce it to the # If we're blocking and have a timeout, reduce it to the
# appropriate value # appropriate value
timeout = max_time - time.time() timeout = max_time - time.time()
# Update and commit offsets if necessary
self.offsets.update(new_offsets)
self.count_since_commit += len(messages)
self._auto_commit()
return messages return messages
def get_message(self, block=True, timeout=0.1): def get_message(self, block=True, timeout=0.1, get_partition_info=None):
return self._get_message(block, timeout, get_partition_info)
def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
update_offset=True):
"""
If no messages can be fetched, returns None.
If get_partition_info is None, it defaults to self.partition_info
If get_partition_info is True, returns (partition, message)
If get_partition_info is False, returns message
"""
if self.queue.empty(): if self.queue.empty():
# We're out of messages, go grab some more. # We're out of messages, go grab some more.
with FetchContext(self, block, timeout): with FetchContext(self, block, timeout):
self._fetch() self._fetch()
try: try:
return self.queue.get_nowait() partition, message = self.queue.get_nowait()
if update_offset:
# Update partition offset
self.offsets[partition] = message.offset + 1
# Count, check and commit messages if necessary
self.count_since_commit += 1
self._auto_commit()
if get_partition_info is None:
get_partition_info = self.partition_info
if get_partition_info:
return partition, message
else:
return message
except Empty: except Empty:
return None return None
@@ -367,11 +405,11 @@ class SimpleConsumer(Consumer):
def _fetch(self): def _fetch(self):
# Create fetch request payloads for all the partitions # Create fetch request payloads for all the partitions
requests = [] requests = []
partitions = self.offsets.keys() partitions = self.fetch_offsets.keys()
while partitions: while partitions:
for partition in partitions: for partition in partitions:
requests.append(FetchRequest(self.topic, partition, requests.append(FetchRequest(self.topic, partition,
self.offsets[partition], self.fetch_offsets[partition],
self.buffer_size)) self.buffer_size))
# Send request # Send request
responses = self.client.send_fetch_request( responses = self.client.send_fetch_request(
@@ -384,18 +422,9 @@ class SimpleConsumer(Consumer):
partition = resp.partition partition = resp.partition
try: try:
for message in resp.messages: for message in resp.messages:
# Update partition offset
self.offsets[partition] = message.offset + 1
# Count, check and commit messages if necessary
self.count_since_commit += 1
self._auto_commit()
# Put the message in our queue # Put the message in our queue
if self.partition_info:
self.queue.put((partition, message)) self.queue.put((partition, message))
else: self.fetch_offsets[partition] = message.offset + 1
self.queue.put(message)
except ConsumerFetchSizeTooSmall, e: except ConsumerFetchSizeTooSmall, e:
if (self.max_buffer_size is not None and if (self.max_buffer_size is not None and
self.buffer_size == self.max_buffer_size): self.buffer_size == self.max_buffer_size):
@@ -585,12 +614,11 @@ class MultiProcessConsumer(Consumer):
break break
# Count, check and commit messages if necessary # Count, check and commit messages if necessary
self.offsets[partition] = message.offset self.offsets[partition] = message.offset + 1
self.start.clear() self.start.clear()
yield message
self.count_since_commit += 1 self.count_since_commit += 1
self._auto_commit() self._auto_commit()
yield message
self.start.clear() self.start.clear()
@@ -613,9 +641,10 @@ class MultiProcessConsumer(Consumer):
self.size.value = count self.size.value = count
self.pause.clear() self.pause.clear()
if timeout: if timeout is not None:
max_time = time.time() + timeout max_time = time.time() + timeout
new_offsets = {}
while count > 0 and (timeout is None or timeout > 0): while count > 0 and (timeout is None or timeout > 0):
# Trigger consumption only if the queue is empty # Trigger consumption only if the queue is empty
# By doing this, we will ensure that consumers do not # By doing this, we will ensure that consumers do not
@@ -630,16 +659,18 @@ class MultiProcessConsumer(Consumer):
break break
messages.append(message) messages.append(message)
new_offsets[partition] = message.offset + 1
# Count, check and commit messages if necessary
self.offsets[partition] = message.offset
self.count_since_commit += 1
self._auto_commit()
count -= 1 count -= 1
if timeout is not None:
timeout = max_time - time.time() timeout = max_time - time.time()
self.size.value = 0 self.size.value = 0
self.start.clear() self.start.clear()
self.pause.set() self.pause.set()
# Update and commit offsets if necessary
self.offsets.update(new_offsets)
self.count_since_commit += len(messages)
self._auto_commit()
return messages return messages