From 0adb71af3858a8f4d1cfb3fe072989499b3b3c4f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 10 Jan 2016 22:47:13 -0800 Subject: [PATCH 1/9] Fetcher logging should be debug or trace (left higher during testing) --- kafka/consumer/fetcher.py | 20 +++++++++++--------- kafka/consumer/group.py | 1 + 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index eb85060..6446f4a 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -301,9 +301,9 @@ class Fetcher(six.Iterator): elif fetch_offset == position: next_offset = messages[-1][0] + 1 - log.debug("Returning fetched records at offset %d for assigned" - " partition %s and update position to %s", position, - tp, next_offset) + log.log(0, "Returning fetched records at offset %d for assigned" + " partition %s and update position to %s", position, + tp, next_offset) self._subscriptions.assignment[tp].position = next_offset for record in self._unpack_message_set(tp, messages): @@ -344,8 +344,8 @@ class Fetcher(six.Iterator): if not self._subscriptions.is_assigned(tp): # this can happen when a rebalance happened before # fetched records are returned - log.warning("Not returning fetched records for partition %s" - " since it is no longer assigned", tp) + log.debug("Not returning fetched records for partition %s" + " since it is no longer assigned", tp) continue # note that the consumed position should always be available @@ -354,18 +354,20 @@ class Fetcher(six.Iterator): if not self._subscriptions.is_fetchable(tp): # this can happen when a partition consumption paused before # fetched records are returned - log.warning("Not returning fetched records for assigned partition" - " %s since it is no longer fetchable", tp) + log.debug("Not returning fetched records for assigned partition" + " %s since it is no longer fetchable", tp) elif fetch_offset == position: + log.log(0, "Returning fetched records at offset %d for assigned" + " partition %s", position, tp) for msg in self._unpack_message_set(tp, messages): self._subscriptions.assignment[tp].position = msg.offset + 1 yield msg else: # these records aren't next in line based on the last consumed # position, ignore them they must be from an obsolete request - log.warning("Ignoring fetched records for %s at offset %s", - tp, fetch_offset) + log.debug("Ignoring fetched records for %s at offset %s", + tp, fetch_offset) # Send any additional FetchRequests that we can now # this will likely fetch each partition individually, rather than diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 704c994..d83c452 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -639,6 +639,7 @@ class KafkaConsumer(six.Iterator): for msg in self._fetcher: yield msg if time.time() > timeout_at: + log.debug("internal iterator timeout - breaking for poll") break def __iter__(self): # pylint: disable=non-iterator-returned From 76e7d13bdd736aa23507a336d04ec025636f9404 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 10 Jan 2016 22:48:09 -0800 Subject: [PATCH 2/9] Check for assignment changes before yielding new record --- kafka/consumer/fetcher.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 6446f4a..91d3711 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -361,6 +361,15 @@ class Fetcher(six.Iterator): log.log(0, "Returning fetched records at offset %d for assigned" " partition %s", position, tp) for msg in self._unpack_message_set(tp, messages): + + # Because we are in a generator, it is possible for + # assignment to change between yield calls + # so we need to re-check on each loop + if not self._subscriptions.is_assigned(tp): + log.debug("Not returning fetched records for partition %s" + " since it is no longer assigned", tp) + break + self._subscriptions.assignment[tp].position = msg.offset + 1 yield msg else: From 458bdb50f62a0fa2556bca11cf6cc68c6e935ca6 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 10 Jan 2016 22:53:35 -0800 Subject: [PATCH 3/9] Reorganize init_fetches calls during iteration Generally should not init_fetches while the generator has pending messages; this revision adds an explicit check / noop to the public interface, and uses a private method internally to attempt to pipeline fetch requests. --- kafka/consumer/fetcher.py | 22 +++++++++++++++++----- kafka/consumer/group.py | 5 +++-- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 91d3711..c7d567e 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -86,9 +86,21 @@ class Fetcher(six.Iterator): def init_fetches(self): """Send FetchRequests asynchronously for all assigned partitions. + Note: noop if there are unconsumed records internal to the fetcher + Returns: List of Futures: each future resolves to a FetchResponse """ + # We need to be careful when creating fetch records during iteration + # so we verify that there are no records in the deque, or in an + # iterator + if self._records or self._iterator: + log.debug('Skipping init_fetches because there are unconsumed' + ' records internally') + return [] + return self._init_fetches() + + def _init_fetches(self): futures = [] for node_id, request in six.iteritems(self._create_fetch_requests()): if self._client.ready(node_id): @@ -339,6 +351,11 @@ class Fetcher(six.Iterator): self._raise_if_unauthorized_topics() self._raise_if_record_too_large() + # Send additional FetchRequests when the internal queue is low + # this should enable moderate pipelining + if len(self._records) == 1: + self._init_fetches() + (fetch_offset, tp, messages) = self._records.popleft() if not self._subscriptions.is_assigned(tp): @@ -378,11 +395,6 @@ class Fetcher(six.Iterator): log.debug("Ignoring fetched records for %s at offset %s", tp, fetch_offset) - # Send any additional FetchRequests that we can now - # this will likely fetch each partition individually, rather than - # fetch multiple partitions in bulk when they are on the same broker - self.init_fetches() - def __iter__(self): # pylint: disable=non-iterator-returned return self diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index d83c452..bd977c5 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -626,8 +626,6 @@ class KafkaConsumer(six.Iterator): partitions = self._subscription.missing_fetch_positions() self._update_fetch_positions(partitions) - # init any new fetches (won't resend pending fetches) - self._fetcher.init_fetches() self._client.poll( max(0, self._consumer_timeout - time.time()) * 1000) @@ -641,6 +639,8 @@ class KafkaConsumer(six.Iterator): if time.time() > timeout_at: log.debug("internal iterator timeout - breaking for poll") break + else: + self._fetcher.init_fetches() def __iter__(self): # pylint: disable=non-iterator-returned return self @@ -648,6 +648,7 @@ class KafkaConsumer(six.Iterator): def __next__(self): if not self._iterator: self._iterator = self._message_generator() + self._fetcher.init_fetches() # consumer_timeout_ms can be used to stop iteration early if self.config['consumer_timeout_ms'] >= 0: From 1c4a8bfc671282c002e39fd67afd5f4ccef0ee4c Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 10 Jan 2016 22:58:11 -0800 Subject: [PATCH 4/9] Update docstring and comments in _create_fetch_requests re KAFKA-2978 --- kafka/consumer/fetcher.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index c7d567e..fe29e77 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -492,8 +492,7 @@ class Fetcher(six.Iterator): def _create_fetch_requests(self): """Create fetch requests for all assigned partitions, grouped by node. - FetchRequests skipped if no leader, node has requests in flight, or we - have not returned all previously fetched records to consumer + FetchRequests skipped if no leader, or node has requests in flight Returns: dict: {node_id: [FetchRequest,...]} @@ -509,9 +508,7 @@ class Fetcher(six.Iterator): " Requesting metadata update", partition) self._client.cluster.request_update() elif self._client.in_flight_request_count(node_id) == 0: - # if there is a leader and no in-flight requests, - # issue a new fetch but only fetch data for partitions whose - # previously fetched data has been consumed + # fetch if there is a leader and no in-flight requests position = self._subscriptions.assignment[partition].position partition_info = ( partition.partition, From e58b447b8e9a7eaa307244b7a315c19ac00381a0 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 12 Jan 2016 14:38:16 -0800 Subject: [PATCH 5/9] Use private deque to track in-flight fetchrequests --- kafka/consumer/fetcher.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index fe29e77..6cafb65 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -80,6 +80,7 @@ class Fetcher(six.Iterator): self._offset_out_of_range_partitions = dict() # {topic_partition: offset} self._record_too_large_partitions = dict() # {topic_partition: offset} self._iterator = None + self._fetch_futures = collections.deque() #self.sensors = FetchManagerMetrics(metrics, metric_group_prefix) @@ -109,8 +110,23 @@ class Fetcher(six.Iterator): future.add_callback(self._handle_fetch_response, request) future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id) futures.append(future) + self._fetch_futures.extend(futures) + self._clean_done_fetch_futures() return futures + def _clean_done_fetch_futures(self): + while True: + if not self._fetch_futures: + break + if not self._fetch_futures[0].is_done: + break + self._fetch_futures.popleft() + + def in_flight_fetches(self): + """Return True if there are any unprocessed FetchRequests in flight.""" + self._clean_done_fetch_futures() + return bool(self._fetch_futures) + def update_fetch_positions(self, partitions): """Update the fetch positions for the provided partitions. From cc3e1cc9a17de52a3ab7955548b8bae945777a97 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 12 Jan 2016 14:41:22 -0800 Subject: [PATCH 6/9] Attempt to pipeline fetchrequests in iterator --- kafka/consumer/fetcher.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 6cafb65..f116bed 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -36,6 +36,7 @@ class Fetcher(six.Iterator): 'fetch_max_wait_ms': 500, 'max_partition_fetch_bytes': 1048576, 'check_crcs': True, + 'iterator_refetch_records': 1, # undocumented -- interface may change } def __init__(self, client, subscriptions, **configs): @@ -369,7 +370,7 @@ class Fetcher(six.Iterator): # Send additional FetchRequests when the internal queue is low # this should enable moderate pipelining - if len(self._records) == 1: + if len(self._records) <= self.config['iterator_refetch_records']: self._init_fetches() (fetch_offset, tp, messages) = self._records.popleft() From dcd62b72e39df00da23e13d783fa5681a20e381b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 12 Jan 2016 14:42:17 -0800 Subject: [PATCH 7/9] Move consumer_timeout handling to private method --- kafka/consumer/group.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index bd977c5..141c1fa 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -650,17 +650,19 @@ class KafkaConsumer(six.Iterator): self._iterator = self._message_generator() self._fetcher.init_fetches() - # consumer_timeout_ms can be used to stop iteration early - if self.config['consumer_timeout_ms'] >= 0: - self._consumer_timeout = time.time() + ( - self.config['consumer_timeout_ms'] / 1000.0) - + self._set_consumer_timeout() try: return next(self._iterator) except StopIteration: self._iterator = None raise + def _set_consumer_timeout(self): + # consumer_timeout_ms can be used to stop iteration early + if self.config['consumer_timeout_ms'] >= 0: + self._consumer_timeout = time.time() + ( + self.config['consumer_timeout_ms'] / 1000.0) + # old KafkaConsumer methods are deprecated def configure(self, **configs): raise NotImplementedError( From 22e84a57cb0a33aef3b37ed0515a85244d3a1615 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 12 Jan 2016 14:43:49 -0800 Subject: [PATCH 8/9] Remove sleep call in client.poll -- expect callers to manage this and log warning --- kafka/client_async.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 88b8ec6..577229a 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -338,17 +338,16 @@ class KafkaClient(object): # select on reads across all connected sockets, blocking up to timeout sockets = dict([(conn._sock, conn) for conn in six.itervalues(self._conns) - if (conn.state is ConnectionStates.CONNECTED - and conn.in_flight_requests)]) + if conn.state is ConnectionStates.CONNECTED + and conn.in_flight_requests]) if not sockets: # if sockets are connecting, we can wake when they are writeable if self._connecting: sockets = [self._conns[node]._sock for node in self._connecting] select.select([], sockets, [], timeout) - # otherwise just sleep to prevent CPU spinning - else: - log.debug('Nothing to do in _poll -- sleeping for %s', timeout) - time.sleep(timeout) + elif timeout: + log.warning('_poll called with a timeout, but nothing to do' + ' -- this can cause high CPU usage during idle') return [] ready, _, _ = select.select(list(sockets.keys()), [], [], timeout) From 3e622068ea7a970c8674a518a05355b6065560f1 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 12 Jan 2016 14:46:02 -0800 Subject: [PATCH 9/9] Sleep in KafkaConsumer iterator if no partition assignment; dont block in poll if no in-flight fetches --- kafka/consumer/group.py | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 141c1fa..333ef64 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -611,6 +611,7 @@ class KafkaConsumer(six.Iterator): self._fetcher.update_fetch_positions(partitions) def _message_generator(self): + assert self.assignment() or self.subscription() is not None while time.time() < self._consumer_timeout: if self.config['api_version'] >= (0, 8, 2): self._coordinator.ensure_coordinator_known() @@ -626,19 +627,40 @@ class KafkaConsumer(six.Iterator): partitions = self._subscription.missing_fetch_positions() self._update_fetch_positions(partitions) - self._client.poll( - max(0, self._consumer_timeout - time.time()) * 1000) - + # We need to make sure we at least keep up with scheduled tasks, + # like heartbeats, auto-commits, and metadata refreshes timeout_at = min(self._consumer_timeout, self._client._delayed_tasks.next_at() + time.time(), self._client.cluster.ttl() / 1000.0 + time.time()) + + if self.config['api_version'] >= (0, 9): + if not self.assignment(): + sleep_time = time.time() - timeout_at + log.debug('No partitions assigned; sleeping for %s', sleep_time) + time.sleep(sleep_time) + continue + + poll_ms = 1000 * (time.time() - self._consumer_timeout) + + # Dont bother blocking if there are no fetches + if not self._fetcher.in_flight_fetches(): + poll_ms = 0 + + self._client.poll(poll_ms) + if time.time() > timeout_at: continue + for msg in self._fetcher: yield msg if time.time() > timeout_at: log.debug("internal iterator timeout - breaking for poll") break + + # an else block on a for loop only executes if there was no break + # so this should only be called on a StopIteration from the fetcher + # and we assume that it is safe to init_fetches when fetcher is done + # i.e., there are no more records stored internally else: self._fetcher.init_fetches() @@ -648,7 +670,6 @@ class KafkaConsumer(six.Iterator): def __next__(self): if not self._iterator: self._iterator = self._message_generator() - self._fetcher.init_fetches() self._set_consumer_timeout() try: