Check delayed task timeout in client.poll()

This commit is contained in:
Dana Powers
2016-01-10 15:42:26 -08:00
parent 7ee73df4c4
commit 45d26b6d32
2 changed files with 32 additions and 4 deletions

View File

@@ -302,7 +302,7 @@ class KafkaClient(object):
self._finish_connect(node_id)
# Send a metadata request if needed
metadata_timeout = self._maybe_refresh_metadata()
metadata_timeout_ms = self._maybe_refresh_metadata()
# Send scheduled tasks
for task, task_future in self._delayed_tasks.pop_ready():
@@ -314,7 +314,9 @@ class KafkaClient(object):
else:
task_future.success(result)
timeout = min(timeout_ms, metadata_timeout,
task_timeout_ms = max(0, 1000 * (
self._delayed_tasks.next_at() - time.time()))
timeout = min(timeout_ms, metadata_timeout_ms, task_timeout_ms,
self.config['request_timeout_ms'])
timeout /= 1000.0

View File

@@ -1,3 +1,4 @@
import time
import pytest
@@ -242,8 +243,33 @@ def test_send(conn):
assert conn.send.called_with(request, expect_response=True)
def test_poll():
pass
def test_poll(mocker):
mocker.patch.object(KafkaClient, '_bootstrap')
metadata = mocker.patch.object(KafkaClient, '_maybe_refresh_metadata')
_poll = mocker.patch.object(KafkaClient, '_poll')
cli = KafkaClient()
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
# metadata timeout wins
metadata.return_value = 1000
tasks.return_value = time.time() + 2 # 2 seconds from now
cli.poll()
_poll.assert_called_with(1.0)
# user timeout wins
cli.poll(250)
_poll.assert_called_with(0.25)
# tasks timeout wins
tasks.return_value = time.time() # next task is now
cli.poll(250)
_poll.assert_called_with(0)
# default is request_timeout_ms
metadata.return_value = 1000000
tasks.return_value = time.time() + 10000
cli.poll()
_poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0)
def test__poll():