Make the default case as 'ack on local write'
Also, ensure that the case of 'no-acks' works fine In conn.send(), do not wait for the response. Wait for it only on conn.recv(). This behaviour is fine now since the connection is not shared among consumer threads etc.
This commit is contained in:
@@ -161,12 +161,16 @@ class KafkaClient(object):
|
||||
|
||||
# Send the request, recv the response
|
||||
conn.send(requestId, request)
|
||||
|
||||
if decoder_fn is None:
|
||||
continue
|
||||
|
||||
response = conn.recv(requestId)
|
||||
for response in decoder_fn(response):
|
||||
acc[(response.topic, response.partition)] = response
|
||||
|
||||
# Order the accumulated responses by the original key order
|
||||
return (acc[k] for k in original_keys)
|
||||
return (acc[k] for k in original_keys) if acc else ()
|
||||
|
||||
#################
|
||||
# Public API #
|
||||
@@ -201,7 +205,12 @@ class KafkaClient(object):
|
||||
|
||||
encoder = partial(KafkaProtocol.encode_produce_request,
|
||||
acks=acks, timeout=timeout)
|
||||
decoder = KafkaProtocol.decode_produce_response
|
||||
|
||||
if acks == 0:
|
||||
decoder = None
|
||||
else:
|
||||
decoder = KafkaProtocol.decode_produce_response
|
||||
|
||||
resps = self._send_broker_aware_request(payloads, encoder, decoder)
|
||||
|
||||
out = []
|
||||
|
||||
@@ -76,11 +76,11 @@ class KafkaConnection(local):
|
||||
sent = self._sock.sendall(payload)
|
||||
if sent != None:
|
||||
raise RuntimeError("Kafka went away")
|
||||
self.data = self._consume_response()
|
||||
|
||||
def recv(self, requestId):
|
||||
"Get a response from Kafka"
|
||||
log.debug("Reading response %d from Kafka" % requestId)
|
||||
self.data = self._consume_response()
|
||||
return self.data
|
||||
|
||||
def close(self):
|
||||
|
||||
@@ -30,7 +30,7 @@ class Producer(object):
|
||||
|
||||
DEFAULT_ACK_TIMEOUT = 1000
|
||||
|
||||
def __init__(self, client, async=False, req_acks=ACK_NOT_REQUIRED,
|
||||
def __init__(self, client, async=False, req_acks=ACK_AFTER_LOCAL_WRITE,
|
||||
ack_timeout=DEFAULT_ACK_TIMEOUT):
|
||||
self.client = client
|
||||
self.async = async
|
||||
@@ -86,7 +86,7 @@ class SimpleProducer(Producer):
|
||||
for an acknowledgement
|
||||
"""
|
||||
def __init__(self, client, topic, async=False,
|
||||
req_acks=Producer.ACK_NOT_REQUIRED,
|
||||
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
|
||||
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT):
|
||||
self.topic = topic
|
||||
client._load_metadata_for_topics(topic)
|
||||
@@ -116,7 +116,7 @@ class KeyedProducer(Producer):
|
||||
for an acknowledgement
|
||||
"""
|
||||
def __init__(self, client, topic, partitioner=None, async=False,
|
||||
req_acks=Producer.ACK_NOT_REQUIRED,
|
||||
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
|
||||
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT):
|
||||
self.topic = topic
|
||||
client._load_metadata_for_topics(topic)
|
||||
|
||||
Reference in New Issue
Block a user