Merge pull request #623 from dpkp/kafka-3318

KAFKA-3318: clean up consumer logging and error messages
This commit is contained in:
Dana Powers
2016-04-05 12:40:44 -07:00
12 changed files with 121 additions and 99 deletions

View File

@@ -3,7 +3,6 @@ from __future__ import absolute_import
import collections
import copy
import logging
import random
import threading
import time

View File

@@ -511,13 +511,13 @@ class Fetcher(six.Iterator):
future.success(offset)
elif error_type in (Errors.NotLeaderForPartitionError,
Errors.UnknownTopicOrPartitionError):
log.warning("Attempt to fetch offsets for partition %s failed due"
" to obsolete leadership information, retrying.",
partition)
log.debug("Attempt to fetch offsets for partition %s failed due"
" to obsolete leadership information, retrying.",
partition)
future.failure(error_type(partition))
else:
log.error("Attempt to fetch offsets for partition %s failed due to:"
" %s", partition, error_type)
log.warning("Attempt to fetch offsets for partition %s failed due to:"
" %s", partition, error_type)
future.failure(error_type(partition))
def _create_fetch_requests(self):

View File

@@ -26,7 +26,7 @@ from .base import (
)
from ..common import (
FetchRequestPayload, KafkaError, OffsetRequestPayload,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
ConsumerFetchSizeTooSmall,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
OffsetOutOfRangeError, FailedPayloadsError, check_error
)

View File

@@ -200,7 +200,7 @@ class BaseCoordinator(object):
self._client.poll()
continue
future = self._send_group_metadata_request()
future = self._send_group_coordinator_request()
self._client.poll(future=future)
if future.failed():
@@ -233,7 +233,7 @@ class BaseCoordinator(object):
while self.need_rejoin():
self.ensure_coordinator_known()
future = self._perform_group_join()
future = self._send_join_group_request()
self._client.poll(future=future)
if future.succeeded():
@@ -253,7 +253,7 @@ class BaseCoordinator(object):
raise exception # pylint: disable-msg=raising-bad-type
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
def _perform_group_join(self):
def _send_join_group_request(self):
"""Join the group and return the assignment for the next generation.
This function handles both JoinGroup and SyncGroup, delegating to
@@ -268,7 +268,7 @@ class BaseCoordinator(object):
return Future().failure(e)
# send a join group request to the coordinator
log.debug("(Re-)joining group %s", self.group_id)
log.info("(Re-)joining group %s", self.group_id)
request = JoinGroupRequest(
self.group_id,
self.config['session_timeout_ms'],
@@ -279,7 +279,7 @@ class BaseCoordinator(object):
for protocol, metadata in self.group_protocols()])
# create the request for the coordinator
log.debug("Issuing request (%s) to coordinator %s", request, self.coordinator_id)
log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id)
future = Future()
_f = self._client.send(self.coordinator_id, request)
_f.add_callback(self._handle_join_group_response, future)
@@ -300,6 +300,8 @@ class BaseCoordinator(object):
def _handle_join_group_response(self, future, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.debug("Received successful JoinGroup response for group %s: %s",
self.group_id, response)
self.member_id = response.member_id
self.generation = response.generation_id
self.rejoin_needed = False
@@ -315,30 +317,31 @@ class BaseCoordinator(object):
self._on_join_follower().chain(future)
elif error_type is Errors.GroupLoadInProgressError:
log.debug("Attempt to join group %s rejected since coordinator is"
" loading the group.", self.group_id)
log.debug("Attempt to join group %s rejected since coordinator %s"
" is loading the group.", self.group_id, self.coordinator_id)
# backoff and retry
future.failure(error_type(response))
elif error_type is Errors.UnknownMemberIdError:
# reset the member id and retry immediately
error = error_type(self.member_id)
self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
log.info("Attempt to join group %s failed due to unknown member id,"
" resetting and retrying.", self.group_id)
log.debug("Attempt to join group %s failed due to unknown member id",
self.group_id)
future.failure(error)
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
# re-discover the coordinator and retry with backoff
self.coordinator_dead()
log.info("Attempt to join group %s failed due to obsolete "
"coordinator information, retrying.", self.group_id)
log.debug("Attempt to join group %s failed due to obsolete "
"coordinator information: %s", self.group_id,
error_type.__name__)
future.failure(error_type())
elif error_type in (Errors.InconsistentGroupProtocolError,
Errors.InvalidSessionTimeoutError,
Errors.InvalidGroupIdError):
# log the error and re-throw the exception
error = error_type(response)
log.error("Attempt to join group %s failed due to: %s",
log.error("Attempt to join group %s failed due to fatal error: %s",
self.group_id, error)
future.failure(error)
elif error_type is Errors.GroupAuthorizationFailedError:
@@ -356,8 +359,8 @@ class BaseCoordinator(object):
self.generation,
self.member_id,
{})
log.debug("Issuing follower SyncGroup (%s) to coordinator %s",
request, self.coordinator_id)
log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s",
self.group_id, self.coordinator_id, request)
return self._send_sync_group_request(request)
def _on_join_leader(self, response):
@@ -386,8 +389,8 @@ class BaseCoordinator(object):
assignment if isinstance(assignment, bytes) else assignment.encode())
for member_id, assignment in six.iteritems(group_assignment)])
log.debug("Issuing leader SyncGroup (%s) to coordinator %s",
request, self.coordinator_id)
log.debug("Sending leader SyncGroup for group %s to coordinator %s: %s",
self.group_id, self.coordinator_id, request)
return self._send_sync_group_request(request)
def _send_sync_group_request(self, request):
@@ -404,8 +407,8 @@ class BaseCoordinator(object):
def _handle_sync_group_response(self, future, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.debug("Received successful sync group response for group %s: %s",
self.group_id, response)
log.info("Successfully joined group %s with generation %s",
self.group_id, self.generation)
#self.sensors.syncLatency.record(response.requestLatencyMs())
future.success(response.member_assignment)
return
@@ -415,21 +418,19 @@ class BaseCoordinator(object):
if error_type is Errors.GroupAuthorizationFailedError:
future.failure(error_type(self.group_id))
elif error_type is Errors.RebalanceInProgressError:
log.info("SyncGroup for group %s failed due to coordinator"
" rebalance, rejoining the group", self.group_id)
log.debug("SyncGroup for group %s failed due to coordinator"
" rebalance", self.group_id)
future.failure(error_type(self.group_id))
elif error_type in (Errors.UnknownMemberIdError,
Errors.IllegalGenerationError):
error = error_type()
log.info("SyncGroup for group %s failed due to %s,"
" rejoining the group", self.group_id, error)
log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
future.failure(error)
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
error = error_type()
log.info("SyncGroup for group %s failed due to %s, will find new"
" coordinator and rejoin", self.group_id, error)
log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
self.coordinator_dead()
future.failure(error)
else:
@@ -437,7 +438,7 @@ class BaseCoordinator(object):
log.error("Unexpected error from SyncGroup: %s", error)
future.failure(error)
def _send_group_metadata_request(self):
def _send_group_coordinator_request(self):
"""Discover the current coordinator for the group.
Returns:
@@ -447,7 +448,8 @@ class BaseCoordinator(object):
if node_id is None:
return Future().failure(Errors.NoBrokersAvailable())
log.debug("Issuing group metadata request to broker %s", node_id)
log.debug("Sending group coordinator request for group %s to broker %s",
self.group_id, node_id)
request = GroupCoordinatorRequest(self.group_id)
future = Future()
_f = self._client.send(node_id, request)
@@ -456,7 +458,7 @@ class BaseCoordinator(object):
return future
def _handle_group_coordinator_response(self, future, response):
log.debug("Group metadata response %s", response)
log.debug("Received group coordinator response %s", response)
if not self.coordinator_unknown():
# We already found the coordinator, so ignore the request
log.debug("Coordinator already known -- ignoring metadata response")
@@ -473,6 +475,8 @@ class BaseCoordinator(object):
return
self.coordinator_id = response.coordinator_id
log.info("Discovered coordinator %s for group %s",
self.coordinator_id, self.group_id)
self._client.ready(self.coordinator_id)
# start sending heartbeats only if we have a valid generation
@@ -495,8 +499,8 @@ class BaseCoordinator(object):
def coordinator_dead(self, error=None):
"""Mark the current coordinator as dead."""
if self.coordinator_id is not None:
log.warning("Marking the coordinator dead (node %s): %s.",
self.coordinator_id, error)
log.warning("Marking the coordinator dead (node %s) for group %s: %s.",
self.coordinator_id, self.group_id, error)
self.coordinator_id = None
def close(self):
@@ -542,22 +546,24 @@ class BaseCoordinator(object):
#self.sensors.heartbeat_latency.record(response.requestLatencyMs())
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.info("Heartbeat successful")
log.debug("Received successful heartbeat response for group %s",
self.group_id)
future.success(None)
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
log.warning("Heartbeat failed: coordinator is either not started or"
" not valid; will refresh metadata and retry")
log.warning("Heartbeat failed for group %s: coordinator (node %s)"
" is either not started or not valid", self.group_id,
self.coordinator_id)
self.coordinator_dead()
future.failure(error_type())
elif error_type is Errors.RebalanceInProgressError:
log.warning("Heartbeat: group is rebalancing; this consumer needs to"
" re-join")
log.warning("Heartbeat failed for group %s because it is"
" rebalancing", self.group_id)
self.rejoin_needed = True
future.failure(error_type())
elif error_type is Errors.IllegalGenerationError:
log.warning("Heartbeat: generation id is not current; this consumer"
" needs to re-join")
log.warning("Heartbeat failed for group %s: generation id is not "
" current.", self.group_id)
self.rejoin_needed = True
future.failure(error_type())
elif error_type is Errors.UnknownMemberIdError:

View File

@@ -198,15 +198,18 @@ class ConsumerCoordinator(BaseCoordinator):
self._auto_commit_task.enable()
assigned = set(self._subscription.assigned_partitions())
log.debug("Set newly assigned partitions %s", assigned)
log.info("Setting newly assigned partitions %s for group %s",
assigned, self.group_id)
# execute the user's callback after rebalance
if self._subscription.listener:
try:
self._subscription.listener.on_partitions_assigned(assigned)
except Exception:
log.exception("User provided listener failed on partition"
" assignment: %s", assigned)
log.exception("User provided listener %s for group %s"
" failed on partition assignment: %s",
self._subscription.listener, self.group_id,
assigned)
def _perform_assignment(self, leader_id, assignment_strategy, members):
assignor = self._lookup_assignor(assignment_strategy)
@@ -226,12 +229,13 @@ class ConsumerCoordinator(BaseCoordinator):
self._subscription.group_subscribe(all_subscribed_topics)
self._client.set_topics(self._subscription.group_subscription())
log.debug("Performing %s assignment for subscriptions %s",
assignor.name, member_metadata)
log.debug("Performing assignment for group %s using strategy %s"
" with subscriptions %s", self.group_id, assignor.name,
member_metadata)
assignments = assignor.assign(self._cluster, member_metadata)
log.debug("Finished assignment: %s", assignments)
log.debug("Finished assignment for group %s: %s", self.group_id, assignments)
group_assignment = {}
for member_id, assignment in six.iteritems(assignments):
@@ -243,15 +247,16 @@ class ConsumerCoordinator(BaseCoordinator):
self._maybe_auto_commit_offsets_sync()
# execute the user's callback before rebalance
log.debug("Revoking previously assigned partitions %s",
self._subscription.assigned_partitions())
log.info("Revoking previously assigned partitions %s for group %s",
self._subscription.assigned_partitions(), self.group_id)
if self._subscription.listener:
try:
revoked = set(self._subscription.assigned_partitions())
self._subscription.listener.on_partitions_revoked(revoked)
except Exception:
log.exception("User provided subscription listener failed"
" on_partitions_revoked")
log.exception("User provided subscription listener %s"
" for group %s failed on_partitions_revoked",
self._subscription.listener, self.group_id)
self._subscription.mark_for_reassignment()
@@ -462,8 +467,8 @@ class ConsumerCoordinator(BaseCoordinator):
) for topic, partitions in six.iteritems(offset_data)]
)
log.debug("Sending offset-commit request with %s to %s",
offsets, node_id)
log.debug("Sending offset-commit request with %s for group %s to %s",
offsets, self.group_id, node_id)
future = Future()
_f = self._client.send(node_id, request)
@@ -482,12 +487,13 @@ class ConsumerCoordinator(BaseCoordinator):
error_type = Errors.for_code(error_code)
if error_type is Errors.NoError:
log.debug("Committed offset %s for partition %s", offset, tp)
log.debug("Group %s committed offset %s for partition %s",
self.group_id, offset, tp)
if self._subscription.is_assigned(tp):
self._subscription.assignment[tp].committed = offset.offset
elif error_type is Errors.GroupAuthorizationFailedError:
log.error("OffsetCommit failed for group %s - %s",
self.group_id, error_type.__name__)
log.error("Not authorized to commit offsets for group %s",
self.group_id)
future.failure(error_type(self.group_id))
return
elif error_type is Errors.TopicAuthorizationFailedError:
@@ -495,24 +501,21 @@ class ConsumerCoordinator(BaseCoordinator):
elif error_type in (Errors.OffsetMetadataTooLargeError,
Errors.InvalidCommitOffsetSizeError):
# raise the error to the user
log.info("OffsetCommit failed for group %s on partition %s"
" due to %s, will retry", self.group_id, tp,
error_type.__name__)
log.debug("OffsetCommit for group %s failed on partition %s"
" %s", self.group_id, tp, error_type.__name__)
future.failure(error_type())
return
elif error_type is Errors.GroupLoadInProgressError:
# just retry
log.info("OffsetCommit failed for group %s because group is"
" initializing (%s), will retry", self.group_id,
error_type.__name__)
log.debug("OffsetCommit for group %s failed: %s",
self.group_id, error_type.__name__)
future.failure(error_type(self.group_id))
return
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError,
Errors.RequestTimedOutError):
log.info("OffsetCommit failed for group %s due to a"
" coordinator error (%s), will find new coordinator"
" and retry", self.group_id, error_type.__name__)
log.debug("OffsetCommit for group %s failed: %s",
self.group_id, error_type.__name__)
self.coordinator_dead()
future.failure(error_type(self.group_id))
return
@@ -521,22 +524,31 @@ class ConsumerCoordinator(BaseCoordinator):
Errors.RebalanceInProgressError):
# need to re-join group
error = error_type(self.group_id)
log.error("OffsetCommit failed for group %s due to group"
" error (%s), will rejoin", self.group_id, error)
log.debug("OffsetCommit for group %s failed: %s",
self.group_id, error)
self._subscription.mark_for_reassignment()
# Errors.CommitFailedError("Commit cannot be completed due to group rebalance"))
future.failure(error)
future.failure(Errors.CommitFailedError(
"Commit cannot be completed since the group has"
" already rebalanced and assigned the partitions to"
" another member. This means that the time between"
" subsequent calls to poll() was longer than the"
" configured session.timeout.ms, which typically"
" implies that the poll loop is spending too much time"
" message processing. You can address this either by"
" increasing the session timeout or by reducing the"
" maximum size of batches returned in poll() with"
" max.poll.records."))
return
else:
log.error("OffsetCommit failed for group % on partition %s"
" with offset %s: %s", self.group_id, tp, offset,
log.error("Group %s failed to commit partition %s at offset"
" %s: %s", self.group_id, tp, offset,
error_type.__name__)
future.failure(error_type())
return
if unauthorized_topics:
log.error("OffsetCommit failed for unauthorized topics %s",
unauthorized_topics)
log.error("Not authorized to commit to topics %s for group %s",
unauthorized_topics, self.group_id)
future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics))
else:
future.success(True)
@@ -573,7 +585,8 @@ class ConsumerCoordinator(BaseCoordinator):
node_id)
return Future().failure(Errors.NodeNotReadyError)
log.debug("Fetching committed offsets for partitions: %s", partitions)
log.debug("Group %s fetching committed offsets for partitions: %s",
self.group_id, partitions)
# construct the request
topic_partitions = collections.defaultdict(set)
for tp in partitions:
@@ -605,7 +618,8 @@ class ConsumerCoordinator(BaseCoordinator):
error_type = Errors.for_code(error_code)
if error_type is not Errors.NoError:
error = error_type()
log.debug("Error fetching offset for %s: %s", tp, error_type())
log.debug("Group %s failed to fetch offset for partition"
" %s: %s", self.group_id, tp, error)
if error_type is Errors.GroupLoadInProgressError:
# just retry
future.failure(error)
@@ -629,10 +643,12 @@ class ConsumerCoordinator(BaseCoordinator):
future.failure(error)
return
elif offset >= 0:
# record the position with the offset (-1 indicates no committed offset to fetch)
# record the position with the offset
# (-1 indicates no committed offset to fetch)
offsets[tp] = OffsetAndMetadata(offset, metadata)
else:
log.debug("No committed offset for partition %s", tp)
log.debug("Group %s has no committed offset for partition"
" %s", self.group_id, tp)
future.success(offsets)
@@ -669,8 +685,8 @@ class AutoCommitTask(object):
return
if self._coordinator.coordinator_unknown():
log.debug("Cannot auto-commit offsets because the coordinator is"
" unknown, will retry after backoff")
log.debug("Cannot auto-commit offsets for group %s because the"
" coordinator is unknown", self._coordinator.group_id)
backoff = self._coordinator.config['retry_backoff_ms'] / 1000.0
self._client.schedule(self, time.time() + backoff)
return
@@ -683,18 +699,21 @@ class AutoCommitTask(object):
def _handle_commit_response(self, offsets, result):
self._request_in_flight = False
if result is True:
log.debug("Successfully auto-committed offsets")
log.debug("Successfully auto-committed offsets for group %s",
self._coordinator.group_id)
next_at = time.time() + self._interval
elif not isinstance(result, BaseException):
raise Errors.IllegalStateError(
'Unrecognized result in _handle_commit_response: %s'
% result)
elif hasattr(result, 'retriable') and result.retriable:
log.debug("Failed to auto-commit offsets: %s, will retry"
" immediately", result)
log.debug("Failed to auto-commit offsets for group %s: %s,"
" will retry immediately", self._coordinator.group_id,
result)
next_at = time.time()
else:
log.warning("Auto offset commit failed: %s", result)
log.warning("Auto offset commit failed for group %s: %s",
self._coordinator.group_id, result)
next_at = time.time() + self._interval
if not self._enabled:

View File

@@ -1,8 +1,6 @@
import copy
import time
import kafka.errors as Errors
class Heartbeat(object):
DEFAULT_CONFIG = {

View File

@@ -46,6 +46,10 @@ class UnrecognizedBrokerVersion(KafkaError):
pass
class CommitFailedError(KafkaError):
pass
class BrokerResponseError(KafkaError):
errno = None
message = None

View File

@@ -1,8 +1,6 @@
import functools
import logging
import kafka.errors as Errors
log = logging.getLogger(__name__)

View File

@@ -3,7 +3,6 @@ from __future__ import absolute_import
import atexit
import copy
import logging
import signal
import threading
import time

View File

@@ -6,14 +6,11 @@ import logging
import threading
import time
import six
from .. import errors as Errors
from ..structs import TopicPartition
from ..protocol.message import Message, MessageSet
from .buffer import MessageSetBuffer, SimpleBufferPool
from .future import FutureRecordMetadata, FutureProduceResult
from ..structs import TopicPartition
log = logging.getLogger(__name__)
@@ -81,7 +78,10 @@ class RecordBatch(object):
if ((self.records.is_full() and request_timeout_ms < since_append_ms)
or (request_timeout_ms < (since_append_ms + linger_ms))):
self.records.close()
self.done(-1, Errors.KafkaTimeoutError('Batch Expired'))
self.done(-1, Errors.KafkaTimeoutError(
"Batch containing %s record(s) expired due to timeout while"
" requesting metadata from brokers for %s", self.record_count,
self.topic_partition))
return True
return False

View File

@@ -3,7 +3,6 @@ from __future__ import absolute_import
from itertools import cycle
import logging
import random
import six
from six.moves import xrange

View File

@@ -482,11 +482,11 @@ def test_send_offset_commit_request_success(patched_coord, offsets):
(OffsetCommitResponse([('foobar', [(0, 7), (1, 7)])]),
Errors.RequestTimedOutError, True, False),
(OffsetCommitResponse([('foobar', [(0, 25), (1, 25)])]),
Errors.UnknownMemberIdError, False, True),
Errors.CommitFailedError, False, True),
(OffsetCommitResponse([('foobar', [(0, 22), (1, 22)])]),
Errors.IllegalGenerationError, False, True),
Errors.CommitFailedError, False, True),
(OffsetCommitResponse([('foobar', [(0, 27), (1, 27)])]),
Errors.RebalanceInProgressError, False, True),
Errors.CommitFailedError, False, True),
(OffsetCommitResponse([('foobar', [(0, 17), (1, 17)])]),
Errors.InvalidTopicError, False, False),
(OffsetCommitResponse([('foobar', [(0, 29), (1, 29)])]),