102 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			102 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # pylint: skip-file
 | |
| from __future__ import absolute_import
 | |
| 
 | |
| import pytest
 | |
| 
 | |
| from kafka.client_async import KafkaClient
 | |
| from kafka.common import TopicPartition, OffsetAndMetadata
 | |
| from kafka.consumer.fetcher import Fetcher
 | |
| from kafka.consumer.subscription_state import SubscriptionState
 | |
| from kafka.future import Future
 | |
| from kafka.protocol.fetch import FetchRequest
 | |
| 
 | |
| import kafka.common as Errors
 | |
| 
 | |
| 
 | |
| @pytest.fixture
 | |
| def client(mocker):
 | |
|     return mocker.Mock(spec=KafkaClient)
 | |
| 
 | |
| 
 | |
| @pytest.fixture
 | |
| def subscription_state():
 | |
|     return SubscriptionState()
 | |
| 
 | |
| 
 | |
| @pytest.fixture
 | |
| def fetcher(client, subscription_state):
 | |
|     subscription_state.subscribe(topics=['foobar'])
 | |
|     assignment = [TopicPartition('foobar', i) for i in range(3)]
 | |
|     subscription_state.assign_from_subscribed(assignment)
 | |
|     for tp in assignment:
 | |
|         subscription_state.seek(tp, 0)
 | |
|     return Fetcher(client, subscription_state)
 | |
| 
 | |
| 
 | |
| def test_init_fetches(fetcher, mocker):
 | |
|     fetch_requests = [
 | |
|         FetchRequest(-1, fetcher.config['fetch_max_wait_ms'],
 | |
|                      fetcher.config['fetch_min_bytes'],
 | |
|                      [('foobar', [
 | |
|                          (0, 0, fetcher.config['max_partition_fetch_bytes']),
 | |
|                          (1, 0, fetcher.config['max_partition_fetch_bytes']),
 | |
|                      ])]),
 | |
|         FetchRequest(-1, fetcher.config['fetch_max_wait_ms'],
 | |
|                      fetcher.config['fetch_min_bytes'],
 | |
|                      [('foobar', [
 | |
|                          (2, 0, fetcher.config['max_partition_fetch_bytes']),
 | |
|                      ])])
 | |
|     ]
 | |
| 
 | |
|     mocker.patch.object(fetcher, '_create_fetch_requests',
 | |
|                         return_value = dict(enumerate(fetch_requests)))
 | |
| 
 | |
|     fetcher._records.append('foobar')
 | |
|     ret = fetcher.init_fetches()
 | |
|     assert fetcher._create_fetch_requests.call_count == 0
 | |
|     assert ret == []
 | |
|     fetcher._records.clear()
 | |
| 
 | |
|     fetcher._iterator = 'foo'
 | |
|     ret = fetcher.init_fetches()
 | |
|     assert fetcher._create_fetch_requests.call_count == 0
 | |
|     assert ret == []
 | |
|     fetcher._iterator = None
 | |
| 
 | |
|     ret = fetcher.init_fetches()
 | |
|     for node, request in enumerate(fetch_requests):
 | |
|         fetcher._client.send.assert_any_call(node, request)
 | |
|     assert len(ret) == len(fetch_requests)
 | |
| 
 | |
| 
 | |
| def test_update_fetch_positions(fetcher, mocker):
 | |
|     mocker.patch.object(fetcher, '_reset_offset')
 | |
|     partition = TopicPartition('foobar', 0)
 | |
| 
 | |
|     # unassigned partition
 | |
|     fetcher.update_fetch_positions([TopicPartition('fizzbuzz', 0)])
 | |
|     assert fetcher._reset_offset.call_count == 0
 | |
| 
 | |
|     # fetchable partition (has offset, not paused)
 | |
|     fetcher.update_fetch_positions([partition])
 | |
|     assert fetcher._reset_offset.call_count == 0
 | |
| 
 | |
|     # partition needs reset, no committed offset
 | |
|     fetcher._subscriptions.need_offset_reset(partition)
 | |
|     fetcher._subscriptions.assignment[partition].awaiting_reset = False
 | |
|     fetcher.update_fetch_positions([partition])
 | |
|     fetcher._reset_offset.assert_called_with(partition)
 | |
|     assert fetcher._subscriptions.assignment[partition].awaiting_reset is True
 | |
|     fetcher.update_fetch_positions([partition])
 | |
|     fetcher._reset_offset.assert_called_with(partition)
 | |
| 
 | |
|     # partition needs reset, has committed offset
 | |
|     fetcher._reset_offset.reset_mock()
 | |
|     fetcher._subscriptions.need_offset_reset(partition)
 | |
|     fetcher._subscriptions.assignment[partition].awaiting_reset = False
 | |
|     fetcher._subscriptions.assignment[partition].committed = 123
 | |
|     mocker.patch.object(fetcher._subscriptions, 'seek')
 | |
|     fetcher.update_fetch_positions([partition])
 | |
|     assert fetcher._reset_offset.call_count == 0
 | |
|     fetcher._subscriptions.seek.assert_called_with(partition, 123)
 | 
