Merge pull request #405 from dpkp/log_error_type

Log response error types in consumer and producer logs
This commit is contained in:
Dana Powers
2015-06-10 13:19:40 -07:00
2 changed files with 11 additions and 7 deletions

View File

@@ -358,23 +358,26 @@ class SimpleConsumer(Consumer):
try:
check_error(resp)
except UnknownTopicOrPartitionError:
log.error('UnknownTopicOrPartitionError for %s:%d',
resp.topic, resp.partition)
self.client.reset_topic_metadata(resp.topic)
raise
except NotLeaderForPartitionError:
log.error('NotLeaderForPartitionError for %s:%d',
resp.topic, resp.partition)
self.client.reset_topic_metadata(resp.topic)
continue
except OffsetOutOfRangeError:
log.warning("OffsetOutOfRangeError for %s - %d. "
"Resetting partition offset...",
log.warning('OffsetOutOfRangeError for %s:%d. '
'Resetting partition offset...',
resp.topic, resp.partition)
self.reset_partition_offset(resp.partition)
# Retry this partition
retry_partitions[resp.partition] = partitions[resp.partition]
continue
except FailedPayloadsError as e:
log.warning("Failed payloads of %s"
"Resetting partition offset...",
e.payload)
log.warning('FailedPayloadsError for %s:%d',
e.payload.topic, e.payload.partition)
# Retry this partition
retry_partitions[e.payload.partition] = partitions[e.payload.partition]
continue

View File

@@ -166,8 +166,9 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
if error_cls:
_handle_error(error_cls, orig_req)
log.error('Error sending ProduceRequest (#%d of %d) to %s:%d '
'with msgs %s', i + 1, len(requests),
log.error('%s sending ProduceRequest (#%d of %d) '
'to %s:%d with msgs %s',
error_cls.__name__, (i + 1), len(requests),
orig_req.topic, orig_req.partition,
orig_req.messages if log_messages_on_error
else hash(orig_req.messages))