Add test for unknown coordinator heartbeat task
This commit is contained in:
@@ -380,16 +380,20 @@ def test_maybe_auto_commit_offsets_sync(mocker, coordinator,
|
|||||||
def patched_coord(mocker, coordinator):
|
def patched_coord(mocker, coordinator):
|
||||||
coordinator._subscription.subscribe(topics=['foobar'])
|
coordinator._subscription.subscribe(topics=['foobar'])
|
||||||
coordinator._subscription.needs_partition_assignment = False
|
coordinator._subscription.needs_partition_assignment = False
|
||||||
mocker.patch.object(coordinator, 'coordinator_unknown')
|
mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
|
||||||
coordinator.coordinator_unknown.return_value = False
|
|
||||||
coordinator.coordinator_id = 0
|
coordinator.coordinator_id = 0
|
||||||
|
coordinator.generation = 0
|
||||||
|
mocker.patch.object(coordinator, 'need_rejoin', return_value=False)
|
||||||
mocker.patch.object(coordinator._client, 'least_loaded_node',
|
mocker.patch.object(coordinator._client, 'least_loaded_node',
|
||||||
return_value=1)
|
return_value=1)
|
||||||
mocker.patch.object(coordinator._client, 'ready', return_value=True)
|
mocker.patch.object(coordinator._client, 'ready', return_value=True)
|
||||||
mocker.patch.object(coordinator._client, 'send')
|
mocker.patch.object(coordinator._client, 'send')
|
||||||
|
mocker.patch.object(coordinator._client, 'schedule')
|
||||||
mocker.spy(coordinator, '_failed_request')
|
mocker.spy(coordinator, '_failed_request')
|
||||||
mocker.spy(coordinator, '_handle_offset_commit_response')
|
mocker.spy(coordinator, '_handle_offset_commit_response')
|
||||||
mocker.spy(coordinator, '_handle_offset_fetch_response')
|
mocker.spy(coordinator, '_handle_offset_fetch_response')
|
||||||
|
mocker.spy(coordinator.heartbeat_task, '_handle_heartbeat_success')
|
||||||
|
mocker.spy(coordinator.heartbeat_task, '_handle_heartbeat_failure')
|
||||||
return coordinator
|
return coordinator
|
||||||
|
|
||||||
|
|
||||||
@@ -573,3 +577,11 @@ def test_handle_offset_fetch_response(patched_coord, offsets,
|
|||||||
assert future.value == offsets
|
assert future.value == offsets
|
||||||
assert patched_coord.coordinator_id is (None if dead else 0)
|
assert patched_coord.coordinator_id is (None if dead else 0)
|
||||||
assert patched_coord._subscription.needs_partition_assignment is reassign
|
assert patched_coord._subscription.needs_partition_assignment is reassign
|
||||||
|
|
||||||
|
|
||||||
|
def test_heartbeat(patched_coord):
|
||||||
|
patched_coord.coordinator_unknown.return_value = True
|
||||||
|
|
||||||
|
patched_coord.heartbeat_task()
|
||||||
|
assert patched_coord._client.schedule.call_count == 1
|
||||||
|
assert patched_coord.heartbeat_task._handle_heartbeat_failure.call_count == 1
|
||||||
|
|||||||
Reference in New Issue
Block a user