factor group checking logic to KafkaConsumer._use_consumer_group()
This commit is contained in:
@@ -439,14 +439,14 @@ class KafkaConsumer(six.Iterator):
|
|||||||
Returns:
|
Returns:
|
||||||
dict: map of topic to list of records (may be empty)
|
dict: map of topic to list of records (may be empty)
|
||||||
"""
|
"""
|
||||||
if self.config['group_id'] is not None:
|
if self._use_consumer_group():
|
||||||
if self.config['api_version'] >= (0, 8, 2):
|
self._coordinator.ensure_coordinator_known()
|
||||||
self._coordinator.ensure_coordinator_known()
|
self._coordinator.ensure_active_group()
|
||||||
|
|
||||||
|
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
|
||||||
|
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
|
||||||
|
self._coordinator.ensure_coordinator_known()
|
||||||
|
|
||||||
if self.config['api_version'] >= (0, 9):
|
|
||||||
# ensure we have partitions assigned if we expect to
|
|
||||||
if self._subscription.partitions_auto_assigned():
|
|
||||||
self._coordinator.ensure_active_group()
|
|
||||||
|
|
||||||
# fetch positions if we have partitions we're subscribed to that we
|
# fetch positions if we have partitions we're subscribed to that we
|
||||||
# don't know the offset for
|
# don't know the offset for
|
||||||
@@ -665,6 +665,16 @@ class KafkaConsumer(six.Iterator):
|
|||||||
self._client.set_topics([])
|
self._client.set_topics([])
|
||||||
log.debug("Unsubscribed all topics or patterns and assigned partitions")
|
log.debug("Unsubscribed all topics or patterns and assigned partitions")
|
||||||
|
|
||||||
|
def _use_consumer_group(self):
|
||||||
|
"""Return True iff this consumer can/should join a broker-coordinated group."""
|
||||||
|
if self.config['api_version'] < (0, 9):
|
||||||
|
return False
|
||||||
|
elif self.config['group_id'] is None:
|
||||||
|
return False
|
||||||
|
elif not self._subscription.partitions_auto_assigned():
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
def _update_fetch_positions(self, partitions):
|
def _update_fetch_positions(self, partitions):
|
||||||
"""
|
"""
|
||||||
Set the fetch position to the committed position (if there is one)
|
Set the fetch position to the committed position (if there is one)
|
||||||
@@ -690,17 +700,16 @@ class KafkaConsumer(six.Iterator):
|
|||||||
def _message_generator(self):
|
def _message_generator(self):
|
||||||
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
|
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
|
||||||
while time.time() < self._consumer_timeout:
|
while time.time() < self._consumer_timeout:
|
||||||
if self.config['group_id'] is not None:
|
|
||||||
if self.config['api_version'] >= (0, 8, 2):
|
|
||||||
self._coordinator.ensure_coordinator_known()
|
|
||||||
|
|
||||||
if self.config['api_version'] >= (0, 9):
|
if self._use_consumer_group():
|
||||||
# ensure we have partitions assigned if we expect to
|
self._coordinator.ensure_coordinator_known()
|
||||||
if self._subscription.partitions_auto_assigned():
|
self._coordinator.ensure_active_group()
|
||||||
self._coordinator.ensure_active_group()
|
|
||||||
|
|
||||||
# fetch positions if we have partitions we're subscribed to that we
|
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
|
||||||
# don't know the offset for
|
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
|
||||||
|
self._coordinator.ensure_coordinator_known()
|
||||||
|
|
||||||
|
# fetch offsets for any subscribed partitions that we arent tracking yet
|
||||||
if not self._subscription.has_all_fetch_positions():
|
if not self._subscription.has_all_fetch_positions():
|
||||||
partitions = self._subscription.missing_fetch_positions()
|
partitions = self._subscription.missing_fetch_positions()
|
||||||
self._update_fetch_positions(partitions)
|
self._update_fetch_positions(partitions)
|
||||||
@@ -714,14 +723,18 @@ class KafkaConsumer(six.Iterator):
|
|||||||
# like heartbeats, auto-commits, and metadata refreshes
|
# like heartbeats, auto-commits, and metadata refreshes
|
||||||
timeout_at = self._next_timeout()
|
timeout_at = self._next_timeout()
|
||||||
|
|
||||||
if self.config['api_version'] >= (0, 9):
|
# Because the consumer client poll does not sleep unless blocking on
|
||||||
if self.config['group_id'] is not None and not self.assignment():
|
# network IO, we need to explicitly sleep when we know we are idle
|
||||||
sleep_time = max(timeout_at - time.time(), 0)
|
# because we haven't been assigned any partitions to fetch / consume
|
||||||
if sleep_time > 0 and not self._client.in_flight_request_count():
|
if self._use_consumer_group() and not self.assignment():
|
||||||
log.debug('No partitions assigned; sleeping for %s', sleep_time)
|
sleep_time = max(timeout_at - time.time(), 0)
|
||||||
time.sleep(sleep_time)
|
if sleep_time > 0 and not self._client.in_flight_request_count():
|
||||||
continue
|
log.debug('No partitions assigned; sleeping for %s', sleep_time)
|
||||||
|
time.sleep(sleep_time)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Short-circuit the fetch iterator if we are already timed out
|
||||||
|
# to avoid any unintentional interaction with fetcher setup
|
||||||
if time.time() > timeout_at:
|
if time.time() > timeout_at:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user