Merge pull request #598 from zackdever/producer-optimization
Producer optimization
This commit is contained in:
@@ -443,12 +443,15 @@ class KafkaProducer(object):
|
||||
self._sender.add_topic(topic)
|
||||
begin = time.time()
|
||||
elapsed = 0.0
|
||||
metadata_event = threading.Event()
|
||||
metadata_event = None
|
||||
while True:
|
||||
partitions = self._metadata.partitions_for_topic(topic)
|
||||
if partitions is not None:
|
||||
return partitions
|
||||
|
||||
if not metadata_event:
|
||||
metadata_event = threading.Event()
|
||||
|
||||
log.debug("Requesting metadata update for topic %s", topic)
|
||||
|
||||
metadata_event.clear()
|
||||
|
||||
@@ -4,7 +4,6 @@ import collections
|
||||
import copy
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
|
||||
import six
|
||||
|
||||
@@ -45,7 +44,7 @@ class Sender(threading.Thread):
|
||||
self._metadata = client.cluster
|
||||
self._running = True
|
||||
self._force_close = False
|
||||
self._topics_to_add = []
|
||||
self._topics_to_add = set()
|
||||
|
||||
def run(self):
|
||||
"""The main run loop for the sender thread."""
|
||||
@@ -158,8 +157,9 @@ class Sender(threading.Thread):
|
||||
self.initiate_close()
|
||||
|
||||
def add_topic(self, topic):
|
||||
self._topics_to_add.append(topic)
|
||||
self.wakeup()
|
||||
if topic not in self._topics_to_add:
|
||||
self._topics_to_add.add(topic)
|
||||
self.wakeup()
|
||||
|
||||
def _failed_produce(self, batches, node_id, error):
|
||||
log.debug("Error sending produce request to node %d: %s", node_id, error) # trace
|
||||
|
||||
Reference in New Issue
Block a user