Cleaner event handling in _wait_on_metadata
This commit is contained in:
@@ -438,35 +438,29 @@ class KafkaProducer(object):
|
||||
"""
|
||||
# add topic to metadata topic list if it is not there already.
|
||||
self._sender.add_topic(topic)
|
||||
|
||||
# Coordinate sleep / wake with a threading.Event
|
||||
def request_update(self, _event):
|
||||
_event.clear()
|
||||
log.debug("Requesting metadata update for topic %s.", topic)
|
||||
f = self._metadata.request_update()
|
||||
def _event_set(_event, *args):
|
||||
_event.set()
|
||||
f.add_both(_event_set, _event)
|
||||
return f
|
||||
|
||||
begin = time.time()
|
||||
elapsed = 0.0
|
||||
event = threading.Event()
|
||||
metadata_event = threading.Event()
|
||||
while True:
|
||||
partitions = self._metadata.partitions_for_topic(topic)
|
||||
if partitions is not None:
|
||||
return partitions
|
||||
|
||||
log.debug("Requesting metadata update for topic %s", topic)
|
||||
future = request_update(self, event)
|
||||
|
||||
metadata_event.clear()
|
||||
future = self._metadata.request_update()
|
||||
future.add_both(lambda e, *args: e.set(), metadata_event)
|
||||
self._sender.wakeup()
|
||||
event.wait(max_wait - elapsed)
|
||||
metadata_event.wait(max_wait - elapsed)
|
||||
elapsed = time.time() - begin
|
||||
if elapsed >= max_wait:
|
||||
if not metadata_event.is_set():
|
||||
raise Errors.KafkaTimeoutError(
|
||||
"Failed to update metadata after %s secs.", max_wait)
|
||||
elif topic in self._metadata.unauthorized_topics:
|
||||
raise Errors.TopicAuthorizationFailedError(topic)
|
||||
else:
|
||||
log.debug("_wait_on_metadata woke after %s secs.", elapsed)
|
||||
|
||||
def _serialize(self, topic, key, value):
|
||||
# pylint: disable-msg=not-callable
|
||||
|
Reference in New Issue
Block a user