Key is passed when creating messages for both async=False and async=True
This commit is contained in:
@@ -49,7 +49,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
|
||||
# timeout is reached
|
||||
while count > 0 and timeout >= 0:
|
||||
try:
|
||||
topic_partition, msg = queue.get(timeout=timeout)
|
||||
topic_partition, msg, key = queue.get(timeout=timeout)
|
||||
|
||||
except Empty:
|
||||
break
|
||||
@@ -67,7 +67,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
|
||||
# Send collected requests upstream
|
||||
reqs = []
|
||||
for topic_partition, msg in msgset.items():
|
||||
messages = create_message_set(msg, codec)
|
||||
messages = create_message_set(msg, codec, key)
|
||||
req = ProduceRequest(topic_partition.topic,
|
||||
topic_partition.partition,
|
||||
messages)
|
||||
@@ -180,12 +180,13 @@ class Producer(object):
|
||||
if any(not isinstance(m, six.binary_type) for m in msg):
|
||||
raise TypeError("all produce message payloads must be type bytes")
|
||||
|
||||
key = kwargs.pop('key', None)
|
||||
if self.async:
|
||||
for m in msg:
|
||||
self.queue.put((TopicAndPartition(topic, partition), m))
|
||||
self.queue.put((TopicAndPartition(topic, partition), m, key))
|
||||
resp = []
|
||||
else:
|
||||
messages = create_message_set(msg, self.codec)
|
||||
messages = create_message_set(msg, self.codec, key)
|
||||
req = ProduceRequest(topic, partition, messages)
|
||||
try:
|
||||
resp = self.client.send_produce_request([req], acks=self.req_acks,
|
||||
@@ -201,7 +202,7 @@ class Producer(object):
|
||||
forcefully cleaning up.
|
||||
"""
|
||||
if self.async:
|
||||
self.queue.put((STOP_ASYNC_PRODUCER, None))
|
||||
self.queue.put((STOP_ASYNC_PRODUCER, None, None))
|
||||
self.proc.join(timeout)
|
||||
|
||||
if self.proc.is_alive():
|
||||
|
||||
@@ -56,7 +56,7 @@ class KeyedProducer(Producer):
|
||||
|
||||
def send(self, topic, key, msg):
|
||||
partition = self._next_partition(topic, key)
|
||||
return self.send_messages(topic, partition, msg)
|
||||
return self._send_messages(topic, partition, msg, key=key)
|
||||
|
||||
def __repr__(self):
|
||||
return '<KeyedProducer batch=%s>' % self.async
|
||||
|
||||
Reference in New Issue
Block a user