Merge pull request #503 from dpkp/iterator_fetches
Improve iterator fetch requests
This commit is contained in:
@@ -338,17 +338,16 @@ class KafkaClient(object):
|
||||
# select on reads across all connected sockets, blocking up to timeout
|
||||
sockets = dict([(conn._sock, conn)
|
||||
for conn in six.itervalues(self._conns)
|
||||
if (conn.state is ConnectionStates.CONNECTED
|
||||
and conn.in_flight_requests)])
|
||||
if conn.state is ConnectionStates.CONNECTED
|
||||
and conn.in_flight_requests])
|
||||
if not sockets:
|
||||
# if sockets are connecting, we can wake when they are writeable
|
||||
if self._connecting:
|
||||
sockets = [self._conns[node]._sock for node in self._connecting]
|
||||
select.select([], sockets, [], timeout)
|
||||
# otherwise just sleep to prevent CPU spinning
|
||||
else:
|
||||
log.debug('Nothing to do in _poll -- sleeping for %s', timeout)
|
||||
time.sleep(timeout)
|
||||
elif timeout:
|
||||
log.warning('_poll called with a timeout, but nothing to do'
|
||||
' -- this can cause high CPU usage during idle')
|
||||
return []
|
||||
|
||||
ready, _, _ = select.select(list(sockets.keys()), [], [], timeout)
|
||||
|
||||
@@ -36,6 +36,7 @@ class Fetcher(six.Iterator):
|
||||
'fetch_max_wait_ms': 500,
|
||||
'max_partition_fetch_bytes': 1048576,
|
||||
'check_crcs': True,
|
||||
'iterator_refetch_records': 1, # undocumented -- interface may change
|
||||
}
|
||||
|
||||
def __init__(self, client, subscriptions, **configs):
|
||||
@@ -80,15 +81,28 @@ class Fetcher(six.Iterator):
|
||||
self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
|
||||
self._record_too_large_partitions = dict() # {topic_partition: offset}
|
||||
self._iterator = None
|
||||
self._fetch_futures = collections.deque()
|
||||
|
||||
#self.sensors = FetchManagerMetrics(metrics, metric_group_prefix)
|
||||
|
||||
def init_fetches(self):
|
||||
"""Send FetchRequests asynchronously for all assigned partitions.
|
||||
|
||||
Note: noop if there are unconsumed records internal to the fetcher
|
||||
|
||||
Returns:
|
||||
List of Futures: each future resolves to a FetchResponse
|
||||
"""
|
||||
# We need to be careful when creating fetch records during iteration
|
||||
# so we verify that there are no records in the deque, or in an
|
||||
# iterator
|
||||
if self._records or self._iterator:
|
||||
log.debug('Skipping init_fetches because there are unconsumed'
|
||||
' records internally')
|
||||
return []
|
||||
return self._init_fetches()
|
||||
|
||||
def _init_fetches(self):
|
||||
futures = []
|
||||
for node_id, request in six.iteritems(self._create_fetch_requests()):
|
||||
if self._client.ready(node_id):
|
||||
@@ -97,8 +111,23 @@ class Fetcher(six.Iterator):
|
||||
future.add_callback(self._handle_fetch_response, request)
|
||||
future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id)
|
||||
futures.append(future)
|
||||
self._fetch_futures.extend(futures)
|
||||
self._clean_done_fetch_futures()
|
||||
return futures
|
||||
|
||||
def _clean_done_fetch_futures(self):
|
||||
while True:
|
||||
if not self._fetch_futures:
|
||||
break
|
||||
if not self._fetch_futures[0].is_done:
|
||||
break
|
||||
self._fetch_futures.popleft()
|
||||
|
||||
def in_flight_fetches(self):
|
||||
"""Return True if there are any unprocessed FetchRequests in flight."""
|
||||
self._clean_done_fetch_futures()
|
||||
return bool(self._fetch_futures)
|
||||
|
||||
def update_fetch_positions(self, partitions):
|
||||
"""Update the fetch positions for the provided partitions.
|
||||
|
||||
@@ -301,9 +330,9 @@ class Fetcher(six.Iterator):
|
||||
|
||||
elif fetch_offset == position:
|
||||
next_offset = messages[-1][0] + 1
|
||||
log.debug("Returning fetched records at offset %d for assigned"
|
||||
" partition %s and update position to %s", position,
|
||||
tp, next_offset)
|
||||
log.log(0, "Returning fetched records at offset %d for assigned"
|
||||
" partition %s and update position to %s", position,
|
||||
tp, next_offset)
|
||||
self._subscriptions.assignment[tp].position = next_offset
|
||||
|
||||
for record in self._unpack_message_set(tp, messages):
|
||||
@@ -339,13 +368,18 @@ class Fetcher(six.Iterator):
|
||||
self._raise_if_unauthorized_topics()
|
||||
self._raise_if_record_too_large()
|
||||
|
||||
# Send additional FetchRequests when the internal queue is low
|
||||
# this should enable moderate pipelining
|
||||
if len(self._records) <= self.config['iterator_refetch_records']:
|
||||
self._init_fetches()
|
||||
|
||||
(fetch_offset, tp, messages) = self._records.popleft()
|
||||
|
||||
if not self._subscriptions.is_assigned(tp):
|
||||
# this can happen when a rebalance happened before
|
||||
# fetched records are returned
|
||||
log.warning("Not returning fetched records for partition %s"
|
||||
" since it is no longer assigned", tp)
|
||||
log.debug("Not returning fetched records for partition %s"
|
||||
" since it is no longer assigned", tp)
|
||||
continue
|
||||
|
||||
# note that the consumed position should always be available
|
||||
@@ -354,23 +388,29 @@ class Fetcher(six.Iterator):
|
||||
if not self._subscriptions.is_fetchable(tp):
|
||||
# this can happen when a partition consumption paused before
|
||||
# fetched records are returned
|
||||
log.warning("Not returning fetched records for assigned partition"
|
||||
" %s since it is no longer fetchable", tp)
|
||||
log.debug("Not returning fetched records for assigned partition"
|
||||
" %s since it is no longer fetchable", tp)
|
||||
|
||||
elif fetch_offset == position:
|
||||
log.log(0, "Returning fetched records at offset %d for assigned"
|
||||
" partition %s", position, tp)
|
||||
for msg in self._unpack_message_set(tp, messages):
|
||||
|
||||
# Because we are in a generator, it is possible for
|
||||
# assignment to change between yield calls
|
||||
# so we need to re-check on each loop
|
||||
if not self._subscriptions.is_assigned(tp):
|
||||
log.debug("Not returning fetched records for partition %s"
|
||||
" since it is no longer assigned", tp)
|
||||
break
|
||||
|
||||
self._subscriptions.assignment[tp].position = msg.offset + 1
|
||||
yield msg
|
||||
else:
|
||||
# these records aren't next in line based on the last consumed
|
||||
# position, ignore them they must be from an obsolete request
|
||||
log.warning("Ignoring fetched records for %s at offset %s",
|
||||
tp, fetch_offset)
|
||||
|
||||
# Send any additional FetchRequests that we can now
|
||||
# this will likely fetch each partition individually, rather than
|
||||
# fetch multiple partitions in bulk when they are on the same broker
|
||||
self.init_fetches()
|
||||
log.debug("Ignoring fetched records for %s at offset %s",
|
||||
tp, fetch_offset)
|
||||
|
||||
def __iter__(self): # pylint: disable=non-iterator-returned
|
||||
return self
|
||||
@@ -469,8 +509,7 @@ class Fetcher(six.Iterator):
|
||||
def _create_fetch_requests(self):
|
||||
"""Create fetch requests for all assigned partitions, grouped by node.
|
||||
|
||||
FetchRequests skipped if no leader, node has requests in flight, or we
|
||||
have not returned all previously fetched records to consumer
|
||||
FetchRequests skipped if no leader, or node has requests in flight
|
||||
|
||||
Returns:
|
||||
dict: {node_id: [FetchRequest,...]}
|
||||
@@ -486,9 +525,7 @@ class Fetcher(six.Iterator):
|
||||
" Requesting metadata update", partition)
|
||||
self._client.cluster.request_update()
|
||||
elif self._client.in_flight_request_count(node_id) == 0:
|
||||
# if there is a leader and no in-flight requests,
|
||||
# issue a new fetch but only fetch data for partitions whose
|
||||
# previously fetched data has been consumed
|
||||
# fetch if there is a leader and no in-flight requests
|
||||
position = self._subscriptions.assignment[partition].position
|
||||
partition_info = (
|
||||
partition.partition,
|
||||
|
||||
@@ -611,6 +611,7 @@ class KafkaConsumer(six.Iterator):
|
||||
self._fetcher.update_fetch_positions(partitions)
|
||||
|
||||
def _message_generator(self):
|
||||
assert self.assignment() or self.subscription() is not None
|
||||
while time.time() < self._consumer_timeout:
|
||||
if self.config['api_version'] >= (0, 8, 2):
|
||||
self._coordinator.ensure_coordinator_known()
|
||||
@@ -626,21 +627,43 @@ class KafkaConsumer(six.Iterator):
|
||||
partitions = self._subscription.missing_fetch_positions()
|
||||
self._update_fetch_positions(partitions)
|
||||
|
||||
# init any new fetches (won't resend pending fetches)
|
||||
self._fetcher.init_fetches()
|
||||
self._client.poll(
|
||||
max(0, self._consumer_timeout - time.time()) * 1000)
|
||||
|
||||
# We need to make sure we at least keep up with scheduled tasks,
|
||||
# like heartbeats, auto-commits, and metadata refreshes
|
||||
timeout_at = min(self._consumer_timeout,
|
||||
self._client._delayed_tasks.next_at() + time.time(),
|
||||
self._client.cluster.ttl() / 1000.0 + time.time())
|
||||
|
||||
if self.config['api_version'] >= (0, 9):
|
||||
if not self.assignment():
|
||||
sleep_time = time.time() - timeout_at
|
||||
log.debug('No partitions assigned; sleeping for %s', sleep_time)
|
||||
time.sleep(sleep_time)
|
||||
continue
|
||||
|
||||
poll_ms = 1000 * (time.time() - self._consumer_timeout)
|
||||
|
||||
# Dont bother blocking if there are no fetches
|
||||
if not self._fetcher.in_flight_fetches():
|
||||
poll_ms = 0
|
||||
|
||||
self._client.poll(poll_ms)
|
||||
|
||||
if time.time() > timeout_at:
|
||||
continue
|
||||
|
||||
for msg in self._fetcher:
|
||||
yield msg
|
||||
if time.time() > timeout_at:
|
||||
log.debug("internal iterator timeout - breaking for poll")
|
||||
break
|
||||
|
||||
# an else block on a for loop only executes if there was no break
|
||||
# so this should only be called on a StopIteration from the fetcher
|
||||
# and we assume that it is safe to init_fetches when fetcher is done
|
||||
# i.e., there are no more records stored internally
|
||||
else:
|
||||
self._fetcher.init_fetches()
|
||||
|
||||
def __iter__(self): # pylint: disable=non-iterator-returned
|
||||
return self
|
||||
|
||||
@@ -648,17 +671,19 @@ class KafkaConsumer(six.Iterator):
|
||||
if not self._iterator:
|
||||
self._iterator = self._message_generator()
|
||||
|
||||
# consumer_timeout_ms can be used to stop iteration early
|
||||
if self.config['consumer_timeout_ms'] >= 0:
|
||||
self._consumer_timeout = time.time() + (
|
||||
self.config['consumer_timeout_ms'] / 1000.0)
|
||||
|
||||
self._set_consumer_timeout()
|
||||
try:
|
||||
return next(self._iterator)
|
||||
except StopIteration:
|
||||
self._iterator = None
|
||||
raise
|
||||
|
||||
def _set_consumer_timeout(self):
|
||||
# consumer_timeout_ms can be used to stop iteration early
|
||||
if self.config['consumer_timeout_ms'] >= 0:
|
||||
self._consumer_timeout = time.time() + (
|
||||
self.config['consumer_timeout_ms'] / 1000.0)
|
||||
|
||||
# old KafkaConsumer methods are deprecated
|
||||
def configure(self, **configs):
|
||||
raise NotImplementedError(
|
||||
|
||||
Reference in New Issue
Block a user