Merge pull request #583 from dpkp/consumer_heartbeat_fixes

Fix Consumer Heartbeat Bugs
This commit is contained in:
Dana Powers
2016-03-12 19:01:20 -08:00
4 changed files with 107 additions and 44 deletions

View File

@@ -439,14 +439,14 @@ class KafkaConsumer(six.Iterator):
Returns:
dict: map of topic to list of records (may be empty)
"""
if self.config['group_id'] is not None:
if self.config['api_version'] >= (0, 8, 2):
self._coordinator.ensure_coordinator_known()
if self._use_consumer_group():
self._coordinator.ensure_coordinator_known()
self._coordinator.ensure_active_group()
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
self._coordinator.ensure_coordinator_known()
if self.config['api_version'] >= (0, 9):
# ensure we have partitions assigned if we expect to
if self._subscription.partitions_auto_assigned():
self._coordinator.ensure_active_group()
# fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
@@ -665,6 +665,16 @@ class KafkaConsumer(six.Iterator):
self._client.set_topics([])
log.debug("Unsubscribed all topics or patterns and assigned partitions")
def _use_consumer_group(self):
"""Return True iff this consumer can/should join a broker-coordinated group."""
if self.config['api_version'] < (0, 9):
return False
elif self.config['group_id'] is None:
return False
elif not self._subscription.partitions_auto_assigned():
return False
return True
def _update_fetch_positions(self, partitions):
"""
Set the fetch position to the committed position (if there is one)
@@ -690,17 +700,16 @@ class KafkaConsumer(six.Iterator):
def _message_generator(self):
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
while time.time() < self._consumer_timeout:
if self.config['group_id'] is not None:
if self.config['api_version'] >= (0, 8, 2):
self._coordinator.ensure_coordinator_known()
if self.config['api_version'] >= (0, 9):
# ensure we have partitions assigned if we expect to
if self._subscription.partitions_auto_assigned():
self._coordinator.ensure_active_group()
if self._use_consumer_group():
self._coordinator.ensure_coordinator_known()
self._coordinator.ensure_active_group()
# fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
self._coordinator.ensure_coordinator_known()
# fetch offsets for any subscribed partitions that we arent tracking yet
if not self._subscription.has_all_fetch_positions():
partitions = self._subscription.missing_fetch_positions()
self._update_fetch_positions(partitions)
@@ -714,14 +723,18 @@ class KafkaConsumer(six.Iterator):
# like heartbeats, auto-commits, and metadata refreshes
timeout_at = self._next_timeout()
if self.config['api_version'] >= (0, 9):
if self.config['group_id'] is not None and not self.assignment():
sleep_time = max(timeout_at - time.time(), 0)
if sleep_time > 0 and not self._client.in_flight_request_count():
log.debug('No partitions assigned; sleeping for %s', sleep_time)
time.sleep(sleep_time)
continue
# Because the consumer client poll does not sleep unless blocking on
# network IO, we need to explicitly sleep when we know we are idle
# because we haven't been assigned any partitions to fetch / consume
if self._use_consumer_group() and not self.assignment():
sleep_time = max(timeout_at - time.time(), 0)
if sleep_time > 0 and not self._client.in_flight_request_count():
log.debug('No partitions assigned; sleeping for %s', sleep_time)
time.sleep(sleep_time)
continue
# Short-circuit the fetch iterator if we are already timed out
# to avoid any unintentional interaction with fetcher setup
if time.time() > timeout_at:
continue
@@ -739,9 +752,21 @@ class KafkaConsumer(six.Iterator):
self._fetcher.init_fetches()
def _next_timeout(self):
return min(self._consumer_timeout,
self._client._delayed_tasks.next_at() + time.time(),
self._client.cluster.ttl() / 1000.0 + time.time())
timeout = min(self._consumer_timeout,
self._client._delayed_tasks.next_at() + time.time(),
self._client.cluster.ttl() / 1000.0 + time.time())
# Although the delayed_tasks timeout above should cover processing
# HeartbeatRequests, it is still possible that HeartbeatResponses
# are left unprocessed during a long _fetcher iteration without
# an intermediate poll(). And because tasks are responsible for
# rescheduling themselves, an unprocessed response will prevent
# the next heartbeat from being sent. This check should help
# avoid that.
if self._use_consumer_group():
heartbeat = time.time() + self._coordinator.heartbeat.ttl()
timeout = min(timeout, heartbeat)
return timeout
def __iter__(self): # pylint: disable=non-iterator-returned
return self

View File

@@ -536,26 +536,27 @@ class BaseCoordinator(object):
#self.sensors.heartbeat_latency.record(response.requestLatencyMs())
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.debug("Received successful heartbeat response.")
log.info("Heartbeat successful")
future.success(None)
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
log.info("Heartbeat failed: coordinator is either not started or"
" not valid; will refresh metadata and retry")
log.warning("Heartbeat failed: coordinator is either not started or"
" not valid; will refresh metadata and retry")
self.coordinator_dead()
future.failure(error_type())
elif error_type is Errors.RebalanceInProgressError:
log.info("Heartbeat failed: group is rebalancing; re-joining group")
log.warning("Heartbeat: group is rebalancing; this consumer needs to"
" re-join")
self.rejoin_needed = True
future.failure(error_type())
elif error_type is Errors.IllegalGenerationError:
log.info("Heartbeat failed: local generation id is not current;"
" re-joining group")
log.warning("Heartbeat: generation id is not current; this consumer"
" needs to re-join")
self.rejoin_needed = True
future.failure(error_type())
elif error_type is Errors.UnknownMemberIdError:
log.info("Heartbeat failed: local member_id was not recognized;"
" resetting and re-joining group")
log.warning("Heartbeat: local member_id was not recognized;"
" this consumer needs to re-join")
self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
self.rejoin_needed = True
future.failure(error_type)
@@ -594,12 +595,16 @@ class HeartbeatTask(object):
def __call__(self):
if (self._coordinator.generation < 0 or
self._coordinator.need_rejoin() or
self._coordinator.coordinator_unknown()):
self._coordinator.need_rejoin()):
# no need to send the heartbeat we're not using auto-assignment
# or if we are awaiting a rebalance
log.debug("Skipping heartbeat: no auto-assignment"
" or waiting on rebalance")
log.info("Skipping heartbeat: no auto-assignment"
" or waiting on rebalance")
return
if self._coordinator.coordinator_unknown():
log.warning("Coordinator unknown during heartbeat -- will retry")
self._handle_heartbeat_failure(Errors.GroupCoordinatorNotAvailableError())
return
if self._heartbeat.session_expired():
@@ -629,7 +634,7 @@ class HeartbeatTask(object):
self._client.schedule(self, time.time() + ttl)
def _handle_heartbeat_failure(self, e):
log.debug("Heartbeat failed; retrying")
log.warning("Heartbeat failed; retrying")
self._request_in_flight = False
etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0
self._client.schedule(self, etd)

View File

@@ -1,16 +1,17 @@
import collections
import logging
import threading
import os
import time
import pytest
import six
from kafka import SimpleClient, SimpleProducer
from kafka import SimpleClient
from kafka.common import TopicPartition
from kafka.conn import BrokerConnection, ConnectionStates
from kafka.conn import ConnectionStates
from kafka.consumer.group import KafkaConsumer
from kafka.future import Future
from kafka.protocol.metadata import MetadataResponse
from test.conftest import version
from test.testutil import random_string
@@ -115,3 +116,23 @@ def test_group(kafka_broker, topic):
finally:
for c in range(num_consumers):
stop[c].set()
@pytest.fixture
def conn(mocker):
conn = mocker.patch('kafka.client_async.BrokerConnection')
conn.return_value = conn
conn.state = ConnectionStates.CONNECTED
conn.send.return_value = Future().success(
MetadataResponse(
[(0, 'foo', 12), (1, 'bar', 34)], # brokers
[])) # topics
return conn
def test_heartbeat_timeout(conn, mocker):
mocker.patch('kafka.client_async.KafkaClient.check_version', return_value = '0.9')
mocker.patch('time.time', return_value = 1234)
consumer = KafkaConsumer('foobar')
mocker.patch.object(consumer._coordinator.heartbeat, 'ttl', return_value = 0)
assert consumer._next_timeout() == 1234

View File

@@ -380,16 +380,20 @@ def test_maybe_auto_commit_offsets_sync(mocker, coordinator,
def patched_coord(mocker, coordinator):
coordinator._subscription.subscribe(topics=['foobar'])
coordinator._subscription.needs_partition_assignment = False
mocker.patch.object(coordinator, 'coordinator_unknown')
coordinator.coordinator_unknown.return_value = False
mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
coordinator.coordinator_id = 0
coordinator.generation = 0
mocker.patch.object(coordinator, 'need_rejoin', return_value=False)
mocker.patch.object(coordinator._client, 'least_loaded_node',
return_value=1)
mocker.patch.object(coordinator._client, 'ready', return_value=True)
mocker.patch.object(coordinator._client, 'send')
mocker.patch.object(coordinator._client, 'schedule')
mocker.spy(coordinator, '_failed_request')
mocker.spy(coordinator, '_handle_offset_commit_response')
mocker.spy(coordinator, '_handle_offset_fetch_response')
mocker.spy(coordinator.heartbeat_task, '_handle_heartbeat_success')
mocker.spy(coordinator.heartbeat_task, '_handle_heartbeat_failure')
return coordinator
@@ -573,3 +577,11 @@ def test_handle_offset_fetch_response(patched_coord, offsets,
assert future.value == offsets
assert patched_coord.coordinator_id is (None if dead else 0)
assert patched_coord._subscription.needs_partition_assignment is reassign
def test_heartbeat(patched_coord):
patched_coord.coordinator_unknown.return_value = True
patched_coord.heartbeat_task()
assert patched_coord._client.schedule.call_count == 1
assert patched_coord.heartbeat_task._handle_heartbeat_failure.call_count == 1