Fix delayed_task timeout commit 45d26b6

This commit is contained in:
Dana Powers
2016-01-10 21:11:02 -08:00
parent ca08f759df
commit 5d2886bae3
3 changed files with 10 additions and 9 deletions

View File

@@ -314,11 +314,12 @@ class KafkaClient(object):
else: else:
task_future.success(result) task_future.success(result)
task_timeout_ms = max(0, 1000 * ( timeout = min(
self._delayed_tasks.next_at() - time.time())) timeout_ms,
timeout = min(timeout_ms, metadata_timeout_ms, task_timeout_ms, metadata_timeout_ms,
self._delayed_tasks.next_at() * 1000,
self.config['request_timeout_ms']) self.config['request_timeout_ms'])
timeout /= 1000.0 timeout = max(0, timeout / 1000.0)
responses.extend(self._poll(timeout)) responses.extend(self._poll(timeout))
if not future or future.is_done: if not future or future.is_done:

View File

@@ -631,7 +631,7 @@ class KafkaConsumer(six.Iterator):
self._client.poll() self._client.poll()
timeout_at = min(self._consumer_timeout, timeout_at = min(self._consumer_timeout,
self._client._delayed_tasks.next_at(), self._client._delayed_tasks.next_at() + time.time(),
self._client.cluster.ttl() / 1000.0 + time.time()) self._client.cluster.ttl() / 1000.0 + time.time())
for msg in self._fetcher: for msg in self._fetcher:
yield msg yield msg

View File

@@ -252,7 +252,7 @@ def test_poll(mocker):
# metadata timeout wins # metadata timeout wins
metadata.return_value = 1000 metadata.return_value = 1000
tasks.return_value = time.time() + 2 # 2 seconds from now tasks.return_value = 2
cli.poll() cli.poll()
_poll.assert_called_with(1.0) _poll.assert_called_with(1.0)
@@ -261,13 +261,13 @@ def test_poll(mocker):
_poll.assert_called_with(0.25) _poll.assert_called_with(0.25)
# tasks timeout wins # tasks timeout wins
tasks.return_value = time.time() # next task is now tasks.return_value = 0
cli.poll(250) cli.poll(250)
_poll.assert_called_with(0) _poll.assert_called_with(0)
# default is request_timeout_ms # default is request_timeout_ms
metadata.return_value = 1000000 metadata.return_value = 1000000
tasks.return_value = time.time() + 10000 tasks.return_value = 10000
cli.poll() cli.poll()
_poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0) _poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0)