From bce0cad5d384c527d6f25209cb794017cd050303 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 15 Feb 2016 16:24:02 -0800 Subject: [PATCH] Revisit _wait_on_metadata to address timeout and error handling (Issue 539) --- kafka/producer/kafka.py | 39 +++++++++++++++++++-------------------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 2443265..e8601c8 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -439,36 +439,35 @@ class KafkaProducer(object): """ # add topic to metadata topic list if it is not there already. self._sender.add_topic(topic) - partitions = self._metadata.partitions_for_topic(topic) - if partitions: - return partitions - event = threading.Event() - def event_set(*args): - event.set() - def request_update(self, event): - event.clear() + # Coordinate sleep / wake with a threading.Event + def request_update(self, _event): + _event.clear() log.debug("Requesting metadata update for topic %s.", topic) f = self._metadata.request_update() - f.add_both(event_set) + def _event_set(_event, *args): + _event.set() + f.add_both(_event_set, _event) return f begin = time.time() elapsed = 0.0 - future = request_update(self, event) - while elapsed < max_wait: + event = threading.Event() + while True: + partitions = self._metadata.partitions_for_topic(topic) + if partitions is not None: + return partitions + + log.debug("Requesting metadata update for topic %s", topic) + future = request_update(self, event) self._sender.wakeup() event.wait(max_wait - elapsed) - if future.failed(): - future = request_update(self, event) elapsed = time.time() - begin - - partitions = self._metadata.partitions_for_topic(topic) - if partitions: - return partitions - else: - raise Errors.KafkaTimeoutError( - "Failed to update metadata after %s secs.", max_wait) + if elapsed >= max_wait: + raise Errors.KafkaTimeoutError( + "Failed to update metadata after %s secs.", max_wait) + elif topic in self._metadata.unauthorized_topics: + raise Errors.TopicAuthorizationFailedError(topic) def _serialize(self, topic, key, value): # pylint: disable-msg=not-callable