fix tests

This commit is contained in:
Dana Powers
2016-09-16 10:05:24 -07:00
parent aed2750501
commit ae36423391
3 changed files with 3 additions and 22 deletions

View File

@@ -141,11 +141,3 @@ def test_paused(kafka_broker, topic):
consumer.unsubscribe()
assert set() == consumer.paused()
def test_heartbeat_timeout(conn, mocker):
mocker.patch('kafka.client_async.KafkaClient.check_version', return_value = (0, 9))
mocker.patch('time.time', return_value = 1234)
consumer = KafkaConsumer('foobar')
mocker.patch.object(consumer._coordinator.heartbeat, 'ttl', return_value = 0)
assert consumer._next_timeout() == 1234

View File

@@ -500,6 +500,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
def test_kafka_consumer__blocking(self):
TIMEOUT_MS = 500
consumer = self.kafka_consumer(auto_offset_reset='earliest',
enable_auto_commit=False,
consumer_timeout_ms=TIMEOUT_MS)
# Manual assignment avoids overhead of consumer group mgmt

View File

@@ -33,7 +33,7 @@ def fetcher(client, subscription_state):
return Fetcher(client, subscription_state, Metrics())
def test_init_fetches(fetcher, mocker):
def test_send_fetches(fetcher, mocker):
fetch_requests = [
FetchRequest[0](
-1, fetcher.config['fetch_max_wait_ms'],
@@ -53,19 +53,7 @@ def test_init_fetches(fetcher, mocker):
mocker.patch.object(fetcher, '_create_fetch_requests',
return_value = dict(enumerate(fetch_requests)))
fetcher._records.append('foobar')
ret = fetcher.init_fetches()
assert fetcher._create_fetch_requests.call_count == 0
assert ret == []
fetcher._records.clear()
fetcher._iterator = 'foo'
ret = fetcher.init_fetches()
assert fetcher._create_fetch_requests.call_count == 0
assert ret == []
fetcher._iterator = None
ret = fetcher.init_fetches()
ret = fetcher.send_fetches()
for node, request in enumerate(fetch_requests):
fetcher._client.send.assert_any_call(node, request)
assert len(ret) == len(fetch_requests)