Resolve delayed task futures in KafkaClient.poll
This commit is contained in:
@@ -243,11 +243,14 @@ class KafkaClient(object):
|
|||||||
metadata_timeout = self._maybe_refresh_metadata()
|
metadata_timeout = self._maybe_refresh_metadata()
|
||||||
|
|
||||||
# Send scheduled tasks
|
# Send scheduled tasks
|
||||||
for task in self._delayed_tasks.pop_ready():
|
for task, future in self._delayed_tasks.pop_ready():
|
||||||
try:
|
try:
|
||||||
task()
|
result = task()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.error("Task %s failed: %s", task, e)
|
log.error("Task %s failed: %s", task, e)
|
||||||
|
future.failure(e)
|
||||||
|
else:
|
||||||
|
future.success(result)
|
||||||
|
|
||||||
timeout = min(timeout_ms, metadata_timeout,
|
timeout = min(timeout_ms, metadata_timeout,
|
||||||
self.config['request_timeout_ms'])
|
self.config['request_timeout_ms'])
|
||||||
@@ -450,7 +453,7 @@ class DelayedTaskQueue(object):
|
|||||||
else:
|
else:
|
||||||
task, future = maybe_task
|
task, future = maybe_task
|
||||||
del self._task_map[task]
|
del self._task_map[task]
|
||||||
return task
|
return (task, future)
|
||||||
|
|
||||||
def next_at(self):
|
def next_at(self):
|
||||||
"""Number of seconds until next task is ready"""
|
"""Number of seconds until next task is ready"""
|
||||||
|
|||||||
Reference in New Issue
Block a user