Revisit _wait_on_metadata to address timeout and error handling (Issue 539)

This commit is contained in:
Dana Powers
2016-02-15 16:24:02 -08:00
parent bd0caa76d0
commit bce0cad5d3

View File

@@ -439,36 +439,35 @@ class KafkaProducer(object):
"""
# add topic to metadata topic list if it is not there already.
self._sender.add_topic(topic)
partitions = self._metadata.partitions_for_topic(topic)
if partitions:
return partitions
event = threading.Event()
def event_set(*args):
event.set()
def request_update(self, event):
event.clear()
# Coordinate sleep / wake with a threading.Event
def request_update(self, _event):
_event.clear()
log.debug("Requesting metadata update for topic %s.", topic)
f = self._metadata.request_update()
f.add_both(event_set)
def _event_set(_event, *args):
_event.set()
f.add_both(_event_set, _event)
return f
begin = time.time()
elapsed = 0.0
future = request_update(self, event)
while elapsed < max_wait:
event = threading.Event()
while True:
partitions = self._metadata.partitions_for_topic(topic)
if partitions is not None:
return partitions
log.debug("Requesting metadata update for topic %s", topic)
future = request_update(self, event)
self._sender.wakeup()
event.wait(max_wait - elapsed)
if future.failed():
future = request_update(self, event)
elapsed = time.time() - begin
partitions = self._metadata.partitions_for_topic(topic)
if partitions:
return partitions
else:
raise Errors.KafkaTimeoutError(
"Failed to update metadata after %s secs.", max_wait)
if elapsed >= max_wait:
raise Errors.KafkaTimeoutError(
"Failed to update metadata after %s secs.", max_wait)
elif topic in self._metadata.unauthorized_topics:
raise Errors.TopicAuthorizationFailedError(topic)
def _serialize(self, topic, key, value):
# pylint: disable-msg=not-callable