Add Fetcher unit tests
This commit is contained in:
101
test/test_fetcher.py
Normal file
101
test/test_fetcher.py
Normal file
@@ -0,0 +1,101 @@
|
||||
# pylint: skip-file
|
||||
from __future__ import absolute_import
|
||||
|
||||
import pytest
|
||||
|
||||
from kafka.client_async import KafkaClient
|
||||
from kafka.common import TopicPartition, OffsetAndMetadata
|
||||
from kafka.consumer.fetcher import Fetcher
|
||||
from kafka.consumer.subscription_state import SubscriptionState
|
||||
from kafka.future import Future
|
||||
from kafka.protocol.fetch import FetchRequest
|
||||
|
||||
import kafka.common as Errors
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(mocker):
|
||||
return mocker.Mock(spec=KafkaClient)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def subscription_state():
|
||||
return SubscriptionState()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fetcher(client, subscription_state):
|
||||
subscription_state.subscribe(topics=['foobar'])
|
||||
assignment = [TopicPartition('foobar', i) for i in range(3)]
|
||||
subscription_state.assign_from_subscribed(assignment)
|
||||
for tp in assignment:
|
||||
subscription_state.seek(tp, 0)
|
||||
return Fetcher(client, subscription_state)
|
||||
|
||||
|
||||
def test_init_fetches(fetcher, mocker):
|
||||
fetch_requests = [
|
||||
FetchRequest(-1, fetcher.config['fetch_max_wait_ms'],
|
||||
fetcher.config['fetch_min_bytes'],
|
||||
[('foobar', [
|
||||
(0, 0, fetcher.config['max_partition_fetch_bytes']),
|
||||
(1, 0, fetcher.config['max_partition_fetch_bytes']),
|
||||
])]),
|
||||
FetchRequest(-1, fetcher.config['fetch_max_wait_ms'],
|
||||
fetcher.config['fetch_min_bytes'],
|
||||
[('foobar', [
|
||||
(2, 0, fetcher.config['max_partition_fetch_bytes']),
|
||||
])])
|
||||
]
|
||||
|
||||
mocker.patch.object(fetcher, '_create_fetch_requests',
|
||||
return_value = dict(enumerate(fetch_requests)))
|
||||
|
||||
fetcher._records.append('foobar')
|
||||
ret = fetcher.init_fetches()
|
||||
assert fetcher._create_fetch_requests.call_count == 0
|
||||
assert ret == []
|
||||
fetcher._records.clear()
|
||||
|
||||
fetcher._iterator = 'foo'
|
||||
ret = fetcher.init_fetches()
|
||||
assert fetcher._create_fetch_requests.call_count == 0
|
||||
assert ret == []
|
||||
fetcher._iterator = None
|
||||
|
||||
ret = fetcher.init_fetches()
|
||||
for node, request in enumerate(fetch_requests):
|
||||
fetcher._client.send.assert_any_call(node, request)
|
||||
assert len(ret) == len(fetch_requests)
|
||||
|
||||
|
||||
def test_update_fetch_positions(fetcher, mocker):
|
||||
mocker.patch.object(fetcher, '_reset_offset')
|
||||
partition = TopicPartition('foobar', 0)
|
||||
|
||||
# unassigned partition
|
||||
fetcher.update_fetch_positions([TopicPartition('fizzbuzz', 0)])
|
||||
assert fetcher._reset_offset.call_count == 0
|
||||
|
||||
# fetchable partition (has offset, not paused)
|
||||
fetcher.update_fetch_positions([partition])
|
||||
assert fetcher._reset_offset.call_count == 0
|
||||
|
||||
# partition needs reset, no committed offset
|
||||
fetcher._subscriptions.need_offset_reset(partition)
|
||||
fetcher._subscriptions.assignment[partition].awaiting_reset = False
|
||||
fetcher.update_fetch_positions([partition])
|
||||
fetcher._reset_offset.assert_called_with(partition)
|
||||
assert fetcher._subscriptions.assignment[partition].awaiting_reset is True
|
||||
fetcher.update_fetch_positions([partition])
|
||||
fetcher._reset_offset.assert_called_with(partition)
|
||||
|
||||
# partition needs reset, has committed offset
|
||||
fetcher._reset_offset.reset_mock()
|
||||
fetcher._subscriptions.need_offset_reset(partition)
|
||||
fetcher._subscriptions.assignment[partition].awaiting_reset = False
|
||||
fetcher._subscriptions.assignment[partition].committed = 123
|
||||
mocker.patch.object(fetcher._subscriptions, 'seek')
|
||||
fetcher.update_fetch_positions([partition])
|
||||
assert fetcher._reset_offset.call_count == 0
|
||||
fetcher._subscriptions.seek.assert_called_with(partition, 123)
|
Reference in New Issue
Block a user