Add heartbeat timeout test
This commit is contained in:
@@ -1,16 +1,17 @@
|
|||||||
import collections
|
import collections
|
||||||
import logging
|
import logging
|
||||||
import threading
|
import threading
|
||||||
import os
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import six
|
import six
|
||||||
|
|
||||||
from kafka import SimpleClient, SimpleProducer
|
from kafka import SimpleClient
|
||||||
from kafka.common import TopicPartition
|
from kafka.common import TopicPartition
|
||||||
from kafka.conn import BrokerConnection, ConnectionStates
|
from kafka.conn import ConnectionStates
|
||||||
from kafka.consumer.group import KafkaConsumer
|
from kafka.consumer.group import KafkaConsumer
|
||||||
|
from kafka.future import Future
|
||||||
|
from kafka.protocol.metadata import MetadataResponse
|
||||||
|
|
||||||
from test.conftest import version
|
from test.conftest import version
|
||||||
from test.testutil import random_string
|
from test.testutil import random_string
|
||||||
@@ -115,3 +116,23 @@ def test_group(kafka_broker, topic):
|
|||||||
finally:
|
finally:
|
||||||
for c in range(num_consumers):
|
for c in range(num_consumers):
|
||||||
stop[c].set()
|
stop[c].set()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def conn(mocker):
|
||||||
|
conn = mocker.patch('kafka.client_async.BrokerConnection')
|
||||||
|
conn.return_value = conn
|
||||||
|
conn.state = ConnectionStates.CONNECTED
|
||||||
|
conn.send.return_value = Future().success(
|
||||||
|
MetadataResponse(
|
||||||
|
[(0, 'foo', 12), (1, 'bar', 34)], # brokers
|
||||||
|
[])) # topics
|
||||||
|
return conn
|
||||||
|
|
||||||
|
|
||||||
|
def test_heartbeat_timeout(conn, mocker):
|
||||||
|
mocker.patch('kafka.client_async.KafkaClient.check_version', return_value = '0.9')
|
||||||
|
mocker.patch('time.time', return_value = 1234)
|
||||||
|
consumer = KafkaConsumer('foobar')
|
||||||
|
mocker.patch.object(consumer._coordinator.heartbeat, 'ttl', return_value = 0)
|
||||||
|
assert consumer._next_timeout() == 1234
|
||||||
|
|||||||
Reference in New Issue
Block a user