If a completed future is polled, do not block
This commit is contained in:
@@ -314,14 +314,21 @@ class KafkaClient(object):
|
|||||||
else:
|
else:
|
||||||
task_future.success(result)
|
task_future.success(result)
|
||||||
|
|
||||||
timeout = min(
|
# If we got a future that is already done, dont block in _poll
|
||||||
timeout_ms,
|
if future and future.is_done:
|
||||||
metadata_timeout_ms,
|
timeout = 0
|
||||||
self._delayed_tasks.next_at() * 1000,
|
else:
|
||||||
self.config['request_timeout_ms'])
|
timeout = min(
|
||||||
timeout = max(0, timeout / 1000.0)
|
timeout_ms,
|
||||||
|
metadata_timeout_ms,
|
||||||
|
self._delayed_tasks.next_at() * 1000,
|
||||||
|
self.config['request_timeout_ms'])
|
||||||
|
timeout = max(0, timeout / 1000.0) # avoid negative timeouts
|
||||||
|
|
||||||
responses.extend(self._poll(timeout))
|
responses.extend(self._poll(timeout))
|
||||||
|
|
||||||
|
# If all we had was a timeout (future is None) - only do one poll
|
||||||
|
# If we do have a future, we keep looping until it is done
|
||||||
if not future or future.is_done:
|
if not future or future.is_done:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user