Add send/receive debug logging to async producer

This commit is contained in:
Dana Powers
2015-06-09 22:28:54 -07:00
parent 7b2f98f5ec
commit 8b0a598045

View File

@@ -101,7 +101,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
count = 0
log.debug('Skipping new batch collection to handle retries')
else:
log.debug('Batching size: {0}, timeout: {1}'.format(count, timeout))
log.debug('Batching size: %s, timeout: %s', count, timeout)
# Keep fetching till we gather enough messages or a
# timeout is reached
@@ -147,12 +147,14 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
retry_state['do_refresh'] |= True
requests = list(request_tries.keys())
reply = client.send_produce_request(requests,
acks=req_acks,
timeout=ack_timeout,
fail_on_error=False)
log.debug('Sending: %s', requests)
responses = client.send_produce_request(requests,
acks=req_acks,
timeout=ack_timeout,
fail_on_error=False)
for i, response in enumerate(reply):
log.debug('Received: %s', responses)
for i, response in enumerate(responses):
error_cls = None
if isinstance(response, FailedPayloadsError):
error_cls = response.__class__
@@ -164,7 +166,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
if error_cls:
_handle_error(error_cls, orig_req)
log.error('Error sending ProduceRequest to %s:%d with msgs %s',
log.error('Error sending ProduceRequest (#%d of %d) to %s:%d '
'with msgs %s', i + 1, len(requests),
orig_req.topic, orig_req.partition,
orig_req.messages if log_messages_on_error
else hash(orig_req.messages))