Merge pull request #501 from dpkp/coordinator_tests
ConsumerCoordinator cleanups and test coverage
This commit is contained in:
@@ -157,6 +157,9 @@ class SubscriptionState(object):
|
||||
self._group_subscription.update(topics)
|
||||
|
||||
def mark_for_reassignment(self):
|
||||
if self._user_assignment:
|
||||
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
|
||||
assert self.subscription is not None, 'Subscription required'
|
||||
self._group_subscription.intersection_update(self.subscription)
|
||||
self.needs_partition_assignment = True
|
||||
|
||||
|
||||
@@ -621,7 +621,7 @@ class HeartbeatTask(object):
|
||||
etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0
|
||||
self._client.schedule(self, etd)
|
||||
|
||||
|
||||
'''
|
||||
class GroupCoordinatorMetrics(object):
|
||||
def __init__(self, metrics, prefix, tags=None):
|
||||
self.metrics = metrics
|
||||
@@ -674,5 +674,4 @@ class GroupCoordinatorMetrics(object):
|
||||
"The number of seconds since the last controller heartbeat",
|
||||
tags), lastHeartbeat)
|
||||
"""
|
||||
|
||||
|
||||
'''
|
||||
|
||||
@@ -8,6 +8,7 @@ import time
|
||||
import six
|
||||
|
||||
from .base import BaseCoordinator
|
||||
from .assignors.roundrobin import RoundRobinPartitionAssignor
|
||||
from .protocol import (
|
||||
ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment,
|
||||
ConsumerProtocol)
|
||||
@@ -29,7 +30,7 @@ class ConsumerCoordinator(BaseCoordinator):
|
||||
'enable_auto_commit': True,
|
||||
'auto_commit_interval_ms': 5000,
|
||||
'default_offset_commit_callback': lambda offsets, response: True,
|
||||
'assignors': (),
|
||||
'assignors': (RoundRobinPartitionAssignor,),
|
||||
'session_timeout_ms': 30000,
|
||||
'heartbeat_interval_ms': 3000,
|
||||
'retry_backoff_ms': 100,
|
||||
@@ -100,6 +101,7 @@ class ConsumerCoordinator(BaseCoordinator):
|
||||
def group_protocols(self):
|
||||
"""Returns list of preferred (protocols, metadata)"""
|
||||
topics = self._subscription.subscription
|
||||
assert topics is not None, 'Consumer has not subscribed to topics'
|
||||
metadata_list = []
|
||||
for assignor in self.config['assignors']:
|
||||
metadata = assignor.metadata(topics)
|
||||
@@ -111,7 +113,7 @@ class ConsumerCoordinator(BaseCoordinator):
|
||||
# if we encounter any unauthorized topics, raise an exception
|
||||
# TODO
|
||||
#if self._cluster.unauthorized_topics:
|
||||
# raise Errors.TopicAuthorizationError(self._cluster.unauthorized_topics)
|
||||
# raise TopicAuthorizationError(self._cluster.unauthorized_topics)
|
||||
|
||||
if self._subscription.subscribed_pattern:
|
||||
topics = []
|
||||
@@ -122,7 +124,8 @@ class ConsumerCoordinator(BaseCoordinator):
|
||||
self._subscription.change_subscription(topics)
|
||||
self._client.set_topics(self._subscription.group_subscription())
|
||||
|
||||
# check if there are any changes to the metadata which should trigger a rebalance
|
||||
# check if there are any changes to the metadata which should trigger
|
||||
# a rebalance
|
||||
if self._subscription_metadata_changed():
|
||||
if self.config['api_version'] >= (0, 9):
|
||||
self._subscription.mark_for_reassignment()
|
||||
@@ -182,7 +185,7 @@ class ConsumerCoordinator(BaseCoordinator):
|
||||
# execute the user's callback after rebalance
|
||||
if self._subscription.listener:
|
||||
try:
|
||||
self._subscriptions.listener.on_partitions_assigned(assigned)
|
||||
self._subscription.listener.on_partitions_assigned(assigned)
|
||||
except Exception:
|
||||
log.exception("User provided listener failed on partition"
|
||||
" assignment: %s", assigned)
|
||||
@@ -263,6 +266,9 @@ class ConsumerCoordinator(BaseCoordinator):
|
||||
Returns:
|
||||
dict: {TopicPartition: OffsetAndMetadata}
|
||||
"""
|
||||
if not partitions:
|
||||
return {}
|
||||
|
||||
while True:
|
||||
if self.config['api_version'] >= (0, 8, 2):
|
||||
self.ensure_coordinator_known()
|
||||
@@ -297,11 +303,16 @@ class ConsumerCoordinator(BaseCoordinator):
|
||||
Returns:
|
||||
Future: indicating whether the commit was successful or not
|
||||
"""
|
||||
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
|
||||
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
|
||||
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
|
||||
offsets.values()))
|
||||
if callback is None:
|
||||
callback = self.config['default_offset_commit_callback']
|
||||
self._subscription.needs_fetch_committed_offsets = True
|
||||
future = self._send_offset_commit_request(offsets)
|
||||
future.add_both(callback, offsets)
|
||||
return future
|
||||
|
||||
def commit_offsets_sync(self, offsets):
|
||||
"""Commit specific offsets synchronously.
|
||||
@@ -314,6 +325,10 @@ class ConsumerCoordinator(BaseCoordinator):
|
||||
|
||||
Raises error on failure
|
||||
"""
|
||||
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
|
||||
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
|
||||
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
|
||||
offsets.values()))
|
||||
if not offsets:
|
||||
return
|
||||
|
||||
@@ -325,7 +340,7 @@ class ConsumerCoordinator(BaseCoordinator):
|
||||
self._client.poll(future=future)
|
||||
|
||||
if future.succeeded():
|
||||
return
|
||||
return future.value
|
||||
|
||||
if not future.retriable():
|
||||
raise future.exception # pylint: disable-msg=raising-bad-type
|
||||
@@ -369,6 +384,13 @@ class ConsumerCoordinator(BaseCoordinator):
|
||||
Returns:
|
||||
Future: indicating whether the commit was successful or not
|
||||
"""
|
||||
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
|
||||
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
|
||||
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
|
||||
offsets.values()))
|
||||
if not offsets:
|
||||
return Future().success(None)
|
||||
|
||||
if self.config['api_version'] >= (0, 8, 2):
|
||||
if self.coordinator_unknown():
|
||||
return Future().failure(Errors.GroupCoordinatorNotAvailableError)
|
||||
@@ -376,9 +398,6 @@ class ConsumerCoordinator(BaseCoordinator):
|
||||
else:
|
||||
node_id = self._client.least_loaded_node()
|
||||
|
||||
if not offsets:
|
||||
return Future().failure(None)
|
||||
|
||||
# create the offset commit request
|
||||
offset_data = collections.defaultdict(dict)
|
||||
for tp, offset in six.iteritems(offsets):
|
||||
@@ -428,7 +447,7 @@ class ConsumerCoordinator(BaseCoordinator):
|
||||
future = Future()
|
||||
_f = self._client.send(node_id, request)
|
||||
_f.add_callback(self._handle_offset_commit_response, offsets, future)
|
||||
_f.add_errback(self._failed_request, future)
|
||||
_f.add_errback(self._failed_request, node_id, request, future)
|
||||
return future
|
||||
|
||||
def _handle_offset_commit_response(self, offsets, future, response):
|
||||
@@ -513,6 +532,11 @@ class ConsumerCoordinator(BaseCoordinator):
|
||||
Returns:
|
||||
Future: resolves to dict of offsets: {TopicPartition: int}
|
||||
"""
|
||||
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
|
||||
assert all(map(lambda k: isinstance(k, TopicPartition), partitions))
|
||||
if not partitions:
|
||||
return Future().success({})
|
||||
|
||||
if self.config['api_version'] >= (0, 8, 2):
|
||||
if self.coordinator_unknown():
|
||||
return Future().failure(Errors.GroupCoordinatorNotAvailableError)
|
||||
@@ -541,7 +565,7 @@ class ConsumerCoordinator(BaseCoordinator):
|
||||
future = Future()
|
||||
_f = self._client.send(node_id, request)
|
||||
_f.add_callback(self._handle_offset_fetch_response, future)
|
||||
_f.add_errback(self._failed_request, future)
|
||||
_f.add_errback(self._failed_request, node_id, request, future)
|
||||
return future
|
||||
|
||||
def _handle_offset_fetch_response(self, future, response):
|
||||
|
||||
568
test/test_coordinator.py
Normal file
568
test/test_coordinator.py
Normal file
@@ -0,0 +1,568 @@
|
||||
# pylint: skip-file
|
||||
from __future__ import absolute_import
|
||||
|
||||
import pytest
|
||||
|
||||
from kafka.client_async import KafkaClient
|
||||
from kafka.common import TopicPartition, OffsetAndMetadata
|
||||
from kafka.consumer.subscription_state import (
|
||||
SubscriptionState, ConsumerRebalanceListener)
|
||||
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
|
||||
from kafka.coordinator.consumer import ConsumerCoordinator
|
||||
from kafka.coordinator.protocol import (
|
||||
ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment)
|
||||
from kafka.conn import ConnectionStates
|
||||
from kafka.future import Future
|
||||
from kafka.protocol.commit import (
|
||||
OffsetCommitRequest_v0, OffsetCommitRequest_v1, OffsetCommitRequest_v2,
|
||||
OffsetCommitResponse, OffsetFetchRequest_v0, OffsetFetchRequest_v1,
|
||||
OffsetFetchResponse)
|
||||
from kafka.protocol.metadata import MetadataResponse
|
||||
|
||||
import kafka.common as Errors
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def conn(mocker):
|
||||
conn = mocker.patch('kafka.client_async.BrokerConnection')
|
||||
conn.return_value = conn
|
||||
conn.state = ConnectionStates.CONNECTED
|
||||
conn.send.return_value = Future().success(
|
||||
MetadataResponse(
|
||||
[(0, 'foo', 12), (1, 'bar', 34)], # brokers
|
||||
[])) # topics
|
||||
return conn
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def coordinator(conn):
|
||||
return ConsumerCoordinator(KafkaClient(), SubscriptionState())
|
||||
|
||||
|
||||
def test_init(conn):
|
||||
cli = KafkaClient()
|
||||
coordinator = ConsumerCoordinator(cli, SubscriptionState())
|
||||
|
||||
# metadata update on init
|
||||
assert cli.cluster._need_update is True
|
||||
assert coordinator._handle_metadata_update in cli.cluster._listeners
|
||||
|
||||
|
||||
@pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)])
|
||||
def test_autocommit_enable_api_version(conn, api_version):
|
||||
coordinator = ConsumerCoordinator(
|
||||
KafkaClient(), SubscriptionState(), api_version=api_version)
|
||||
if api_version < (0, 8, 1):
|
||||
assert coordinator._auto_commit_task is None
|
||||
else:
|
||||
assert coordinator._auto_commit_task is not None
|
||||
|
||||
|
||||
def test_protocol_type(coordinator):
|
||||
assert coordinator.protocol_type() is 'consumer'
|
||||
|
||||
|
||||
def test_group_protocols(coordinator):
|
||||
# Requires a subscription
|
||||
try:
|
||||
coordinator.group_protocols()
|
||||
except AssertionError:
|
||||
pass
|
||||
else:
|
||||
assert False, 'Exception not raised when expected'
|
||||
|
||||
coordinator._subscription.subscribe(topics=['foobar'])
|
||||
assert coordinator.group_protocols() == [(
|
||||
'roundrobin',
|
||||
ConsumerProtocolMemberMetadata(
|
||||
RoundRobinPartitionAssignor.version,
|
||||
['foobar'],
|
||||
b'')
|
||||
)]
|
||||
|
||||
|
||||
@pytest.mark.parametrize('api_version', [(0, 8), (0, 8, 1), (0, 8, 2), (0, 9)])
|
||||
def test_pattern_subscription(coordinator, api_version):
|
||||
coordinator.config['api_version'] = api_version
|
||||
coordinator._subscription.subscribe(pattern='foo')
|
||||
assert coordinator._subscription.subscription == set([])
|
||||
assert coordinator._subscription_metadata_changed() is False
|
||||
assert coordinator._subscription.needs_partition_assignment is False
|
||||
|
||||
cluster = coordinator._client.cluster
|
||||
cluster.update_metadata(MetadataResponse(
|
||||
# brokers
|
||||
[(0, 'foo', 12), (1, 'bar', 34)],
|
||||
# topics
|
||||
[(0, 'fizz', []),
|
||||
(0, 'foo1', [(0, 0, 0, [], [])]),
|
||||
(0, 'foo2', [(0, 0, 1, [], [])])]))
|
||||
assert coordinator._subscription.subscription == set(['foo1', 'foo2'])
|
||||
|
||||
# 0.9 consumers should trigger dynamic partition assignment
|
||||
if api_version >= (0, 9):
|
||||
assert coordinator._subscription.needs_partition_assignment is True
|
||||
assert coordinator._subscription.assignment == {}
|
||||
|
||||
# earlier consumers get all partitions assigned locally
|
||||
else:
|
||||
assert coordinator._subscription.needs_partition_assignment is False
|
||||
assert set(coordinator._subscription.assignment.keys()) == set([
|
||||
TopicPartition('foo1', 0),
|
||||
TopicPartition('foo2', 0)])
|
||||
|
||||
|
||||
def test_lookup_assignor(coordinator):
|
||||
assignor = coordinator._lookup_assignor('roundrobin')
|
||||
assert assignor is RoundRobinPartitionAssignor
|
||||
assert coordinator._lookup_assignor('foobar') is None
|
||||
|
||||
|
||||
def test_join_complete(mocker, coordinator):
|
||||
coordinator._subscription.subscribe(topics=['foobar'])
|
||||
assignor = RoundRobinPartitionAssignor()
|
||||
coordinator.config['assignors'] = (assignor,)
|
||||
mocker.spy(assignor, 'on_assignment')
|
||||
assert assignor.on_assignment.call_count == 0
|
||||
assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'')
|
||||
coordinator._on_join_complete(
|
||||
0, 'member-foo', 'roundrobin', assignment.encode())
|
||||
assert assignor.on_assignment.call_count == 1
|
||||
assignor.on_assignment.assert_called_with(assignment)
|
||||
|
||||
|
||||
def test_subscription_listener(mocker, coordinator):
|
||||
listener = mocker.MagicMock(spec=ConsumerRebalanceListener)
|
||||
coordinator._subscription.subscribe(
|
||||
topics=['foobar'],
|
||||
listener=listener)
|
||||
|
||||
coordinator._on_join_prepare(0, 'member-foo')
|
||||
assert listener.on_partitions_revoked.call_count == 1
|
||||
listener.on_partitions_revoked.assert_called_with(set([]))
|
||||
|
||||
assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'')
|
||||
coordinator._on_join_complete(
|
||||
0, 'member-foo', 'roundrobin', assignment.encode())
|
||||
assert listener.on_partitions_assigned.call_count == 1
|
||||
listener.on_partitions_assigned.assert_called_with(set([
|
||||
TopicPartition('foobar', 0),
|
||||
TopicPartition('foobar', 1)]))
|
||||
|
||||
|
||||
def test_subscription_listener_failure(mocker, coordinator):
|
||||
listener = mocker.MagicMock(spec=ConsumerRebalanceListener)
|
||||
coordinator._subscription.subscribe(
|
||||
topics=['foobar'],
|
||||
listener=listener)
|
||||
|
||||
# exception raised in listener should not be re-raised by coordinator
|
||||
listener.on_partitions_revoked.side_effect = Exception('crash')
|
||||
coordinator._on_join_prepare(0, 'member-foo')
|
||||
assert listener.on_partitions_revoked.call_count == 1
|
||||
|
||||
assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'')
|
||||
coordinator._on_join_complete(
|
||||
0, 'member-foo', 'roundrobin', assignment.encode())
|
||||
assert listener.on_partitions_assigned.call_count == 1
|
||||
|
||||
|
||||
def test_perform_assignment(mocker, coordinator):
|
||||
member_metadata = {
|
||||
'member-foo': ConsumerProtocolMemberMetadata(0, ['foo1'], b''),
|
||||
'member-bar': ConsumerProtocolMemberMetadata(0, ['foo1'], b'')
|
||||
}
|
||||
assignments = {
|
||||
'member-foo': ConsumerProtocolMemberAssignment(
|
||||
0, [('foo1', [0])], b''),
|
||||
'member-bar': ConsumerProtocolMemberAssignment(
|
||||
0, [('foo1', [1])], b'')
|
||||
}
|
||||
|
||||
mocker.patch.object(RoundRobinPartitionAssignor, 'assign')
|
||||
RoundRobinPartitionAssignor.assign.return_value = assignments
|
||||
|
||||
ret = coordinator._perform_assignment(
|
||||
'member-foo', 'roundrobin',
|
||||
[(member, metadata.encode())
|
||||
for member, metadata in member_metadata.items()])
|
||||
|
||||
assert RoundRobinPartitionAssignor.assign.call_count == 1
|
||||
RoundRobinPartitionAssignor.assign.assert_called_with(
|
||||
coordinator._client.cluster, member_metadata)
|
||||
assert ret == assignments
|
||||
|
||||
|
||||
def test_on_join_prepare(coordinator):
|
||||
coordinator._subscription.subscribe(topics=['foobar'])
|
||||
coordinator._on_join_prepare(0, 'member-foo')
|
||||
assert coordinator._subscription.needs_partition_assignment is True
|
||||
|
||||
|
||||
def test_need_rejoin(coordinator):
|
||||
# No subscription - no rejoin
|
||||
assert coordinator.need_rejoin() is False
|
||||
|
||||
coordinator._subscription.subscribe(topics=['foobar'])
|
||||
assert coordinator.need_rejoin() is True
|
||||
|
||||
coordinator._subscription.needs_partition_assignment = False
|
||||
coordinator.rejoin_needed = False
|
||||
assert coordinator.need_rejoin() is False
|
||||
|
||||
coordinator._subscription.needs_partition_assignment = True
|
||||
assert coordinator.need_rejoin() is True
|
||||
|
||||
|
||||
def test_refresh_committed_offsets_if_needed(mocker, coordinator):
|
||||
mocker.patch.object(ConsumerCoordinator, 'fetch_committed_offsets',
|
||||
return_value = {
|
||||
TopicPartition('foobar', 0): OffsetAndMetadata(123, b''),
|
||||
TopicPartition('foobar', 1): OffsetAndMetadata(234, b'')})
|
||||
coordinator._subscription.assign_from_user([TopicPartition('foobar', 0)])
|
||||
assert coordinator._subscription.needs_fetch_committed_offsets is True
|
||||
coordinator.refresh_committed_offsets_if_needed()
|
||||
assignment = coordinator._subscription.assignment
|
||||
assert assignment[TopicPartition('foobar', 0)].committed == 123
|
||||
assert TopicPartition('foobar', 1) not in assignment
|
||||
assert coordinator._subscription.needs_fetch_committed_offsets is False
|
||||
|
||||
|
||||
def test_fetch_committed_offsets(mocker, coordinator):
|
||||
|
||||
# No partitions, no IO polling
|
||||
mocker.patch.object(coordinator._client, 'poll')
|
||||
assert coordinator.fetch_committed_offsets([]) == {}
|
||||
assert coordinator._client.poll.call_count == 0
|
||||
|
||||
# general case -- send offset fetch request, get successful future
|
||||
mocker.patch.object(coordinator, 'ensure_coordinator_known')
|
||||
mocker.patch.object(coordinator, '_send_offset_fetch_request',
|
||||
return_value=Future().success('foobar'))
|
||||
partitions = [TopicPartition('foobar', 0)]
|
||||
ret = coordinator.fetch_committed_offsets(partitions)
|
||||
assert ret == 'foobar'
|
||||
coordinator._send_offset_fetch_request.assert_called_with(partitions)
|
||||
assert coordinator._client.poll.call_count == 1
|
||||
|
||||
# Failed future is raised if not retriable
|
||||
coordinator._send_offset_fetch_request.return_value = Future().failure(AssertionError)
|
||||
coordinator._client.poll.reset_mock()
|
||||
try:
|
||||
coordinator.fetch_committed_offsets(partitions)
|
||||
except AssertionError:
|
||||
pass
|
||||
else:
|
||||
assert False, 'Exception not raised when expected'
|
||||
assert coordinator._client.poll.call_count == 1
|
||||
|
||||
coordinator._client.poll.reset_mock()
|
||||
coordinator._send_offset_fetch_request.side_effect = [
|
||||
Future().failure(Errors.RequestTimedOutError),
|
||||
Future().success('fizzbuzz')]
|
||||
|
||||
ret = coordinator.fetch_committed_offsets(partitions)
|
||||
assert ret == 'fizzbuzz'
|
||||
assert coordinator._client.poll.call_count == 2 # call + retry
|
||||
|
||||
|
||||
def test_close(mocker, coordinator):
|
||||
mocker.patch.object(coordinator, '_maybe_auto_commit_offsets_sync')
|
||||
mocker.patch.object(coordinator, '_handle_leave_group_response')
|
||||
coordinator.coordinator_id = 0
|
||||
coordinator.generation = 1
|
||||
cli = coordinator._client
|
||||
mocker.patch.object(cli, 'unschedule')
|
||||
mocker.patch.object(cli, 'send', return_value=Future().success('foobar'))
|
||||
mocker.patch.object(cli, 'poll')
|
||||
|
||||
coordinator.close()
|
||||
assert coordinator._maybe_auto_commit_offsets_sync.call_count == 1
|
||||
cli.unschedule.assert_called_with(coordinator.heartbeat_task)
|
||||
coordinator._handle_leave_group_response.assert_called_with('foobar')
|
||||
|
||||
assert coordinator.generation == -1
|
||||
assert coordinator.member_id == ''
|
||||
assert coordinator.rejoin_needed is True
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def offsets():
|
||||
return {
|
||||
TopicPartition('foobar', 0): OffsetAndMetadata(123, b''),
|
||||
TopicPartition('foobar', 1): OffsetAndMetadata(234, b''),
|
||||
}
|
||||
|
||||
|
||||
def test_commit_offsets_async(mocker, coordinator, offsets):
|
||||
mocker.patch.object(coordinator._client, 'poll')
|
||||
mocker.patch.object(coordinator, 'ensure_coordinator_known')
|
||||
mocker.patch.object(coordinator, '_send_offset_commit_request',
|
||||
return_value=Future().success('fizzbuzz'))
|
||||
ret = coordinator.commit_offsets_async(offsets)
|
||||
assert isinstance(ret, Future)
|
||||
assert coordinator._send_offset_commit_request.call_count == 1
|
||||
|
||||
|
||||
def test_commit_offsets_sync(mocker, coordinator, offsets):
|
||||
mocker.patch.object(coordinator, 'ensure_coordinator_known')
|
||||
mocker.patch.object(coordinator, '_send_offset_commit_request',
|
||||
return_value=Future().success('fizzbuzz'))
|
||||
cli = coordinator._client
|
||||
mocker.patch.object(cli, 'poll')
|
||||
|
||||
# No offsets, no calls
|
||||
assert coordinator.commit_offsets_sync({}) is None
|
||||
assert coordinator._send_offset_commit_request.call_count == 0
|
||||
assert cli.poll.call_count == 0
|
||||
|
||||
ret = coordinator.commit_offsets_sync(offsets)
|
||||
assert coordinator._send_offset_commit_request.call_count == 1
|
||||
assert cli.poll.call_count == 1
|
||||
assert ret == 'fizzbuzz'
|
||||
|
||||
# Failed future is raised if not retriable
|
||||
coordinator._send_offset_commit_request.return_value = Future().failure(AssertionError)
|
||||
coordinator._client.poll.reset_mock()
|
||||
try:
|
||||
coordinator.commit_offsets_sync(offsets)
|
||||
except AssertionError:
|
||||
pass
|
||||
else:
|
||||
assert False, 'Exception not raised when expected'
|
||||
assert coordinator._client.poll.call_count == 1
|
||||
|
||||
coordinator._client.poll.reset_mock()
|
||||
coordinator._send_offset_commit_request.side_effect = [
|
||||
Future().failure(Errors.RequestTimedOutError),
|
||||
Future().success('fizzbuzz')]
|
||||
|
||||
ret = coordinator.commit_offsets_sync(offsets)
|
||||
assert ret == 'fizzbuzz'
|
||||
assert coordinator._client.poll.call_count == 2 # call + retry
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'api_version,enable,error,task_disable,commit_offsets,warn,exc', [
|
||||
((0, 8), True, None, False, False, False, False),
|
||||
((0, 9), False, None, False, False, False, False),
|
||||
((0, 9), True, Errors.UnknownMemberIdError(), True, True, True, False),
|
||||
((0, 9), True, Errors.IllegalGenerationError(), True, True, True, False),
|
||||
((0, 9), True, Errors.RebalanceInProgressError(), True, True, True, False),
|
||||
((0, 9), True, Exception(), True, True, False, True),
|
||||
((0, 9), True, None, True, True, False, False),
|
||||
])
|
||||
def test_maybe_auto_commit_offsets_sync(mocker, coordinator,
|
||||
api_version, enable, error, task_disable,
|
||||
commit_offsets, warn, exc):
|
||||
auto_commit_task = mocker.patch.object(coordinator, '_auto_commit_task')
|
||||
commit_sync = mocker.patch.object(coordinator, 'commit_offsets_sync',
|
||||
side_effect=error)
|
||||
mock_warn = mocker.patch('kafka.coordinator.consumer.log.warning')
|
||||
mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception')
|
||||
|
||||
coordinator.config['api_version'] = api_version
|
||||
coordinator.config['enable_auto_commit'] = enable
|
||||
assert coordinator._maybe_auto_commit_offsets_sync() is None
|
||||
assert auto_commit_task.disable.call_count == (1 if task_disable else 0)
|
||||
assert commit_sync.call_count == (1 if commit_offsets else 0)
|
||||
assert mock_warn.call_count == (1 if warn else 0)
|
||||
assert mock_exc.call_count == (1 if exc else 0)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patched_coord(mocker, coordinator):
|
||||
coordinator._subscription.subscribe(topics=['foobar'])
|
||||
coordinator._subscription.needs_partition_assignment = False
|
||||
mocker.patch.object(coordinator, 'coordinator_unknown')
|
||||
coordinator.coordinator_unknown.return_value = False
|
||||
coordinator.coordinator_id = 0
|
||||
mocker.patch.object(coordinator._client, 'least_loaded_node',
|
||||
return_value=1)
|
||||
mocker.patch.object(coordinator._client, 'send')
|
||||
mocker.spy(coordinator, '_failed_request')
|
||||
mocker.spy(coordinator, '_handle_offset_commit_response')
|
||||
mocker.spy(coordinator, '_handle_offset_fetch_response')
|
||||
return coordinator
|
||||
|
||||
|
||||
def test_send_offset_commit_request_fail(patched_coord, offsets):
|
||||
patched_coord.coordinator_unknown.return_value = True
|
||||
patched_coord.coordinator_id = None
|
||||
|
||||
# No offsets
|
||||
ret = patched_coord._send_offset_commit_request({})
|
||||
assert isinstance(ret, Future)
|
||||
assert ret.succeeded()
|
||||
|
||||
# No coordinator
|
||||
ret = patched_coord._send_offset_commit_request(offsets)
|
||||
assert ret.failed()
|
||||
assert isinstance(ret.exception, Errors.GroupCoordinatorNotAvailableError)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('api_version,req_type', [
|
||||
((0, 8, 1), OffsetCommitRequest_v0),
|
||||
((0, 8, 2), OffsetCommitRequest_v1),
|
||||
((0, 9), OffsetCommitRequest_v2)])
|
||||
def test_send_offset_commit_request_versions(patched_coord, offsets,
|
||||
api_version, req_type):
|
||||
# assuming fixture sets coordinator=0, least_loaded_node=1
|
||||
expect_node = 0 if api_version >= (0, 8, 2) else 1
|
||||
patched_coord.config['api_version'] = api_version
|
||||
|
||||
patched_coord._send_offset_commit_request(offsets)
|
||||
(node, request), _ = patched_coord._client.send.call_args
|
||||
assert node == expect_node, 'Unexpected coordinator node'
|
||||
assert isinstance(request, req_type)
|
||||
|
||||
|
||||
def test_send_offset_commit_request_failure(patched_coord, offsets):
|
||||
_f = Future()
|
||||
patched_coord._client.send.return_value = _f
|
||||
future = patched_coord._send_offset_commit_request(offsets)
|
||||
(node, request), _ = patched_coord._client.send.call_args
|
||||
error = Exception()
|
||||
_f.failure(error)
|
||||
patched_coord._failed_request.assert_called_with(0, request, future, error)
|
||||
assert future.failed()
|
||||
assert future.exception is error
|
||||
|
||||
|
||||
def test_send_offset_commit_request_success(patched_coord, offsets):
|
||||
_f = Future()
|
||||
patched_coord._client.send.return_value = _f
|
||||
future = patched_coord._send_offset_commit_request(offsets)
|
||||
(node, request), _ = patched_coord._client.send.call_args
|
||||
response = OffsetCommitResponse([('foobar', [(0, 0), (1, 0)])])
|
||||
_f.success(response)
|
||||
patched_coord._handle_offset_commit_response.assert_called_with(
|
||||
offsets, future, response)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('response,error,dead,reassign', [
|
||||
(OffsetCommitResponse([('foobar', [(0, 30), (1, 30)])]),
|
||||
Errors.GroupAuthorizationFailedError, False, False),
|
||||
(OffsetCommitResponse([('foobar', [(0, 12), (1, 12)])]),
|
||||
Errors.OffsetMetadataTooLargeError, False, False),
|
||||
(OffsetCommitResponse([('foobar', [(0, 28), (1, 28)])]),
|
||||
Errors.InvalidCommitOffsetSizeError, False, False),
|
||||
(OffsetCommitResponse([('foobar', [(0, 14), (1, 14)])]),
|
||||
Errors.GroupLoadInProgressError, False, False),
|
||||
(OffsetCommitResponse([('foobar', [(0, 15), (1, 15)])]),
|
||||
Errors.GroupCoordinatorNotAvailableError, True, False),
|
||||
(OffsetCommitResponse([('foobar', [(0, 16), (1, 16)])]),
|
||||
Errors.NotCoordinatorForGroupError, True, False),
|
||||
(OffsetCommitResponse([('foobar', [(0, 7), (1, 7)])]),
|
||||
Errors.RequestTimedOutError, True, False),
|
||||
(OffsetCommitResponse([('foobar', [(0, 25), (1, 25)])]),
|
||||
Errors.UnknownMemberIdError, False, True),
|
||||
(OffsetCommitResponse([('foobar', [(0, 22), (1, 22)])]),
|
||||
Errors.IllegalGenerationError, False, True),
|
||||
(OffsetCommitResponse([('foobar', [(0, 27), (1, 27)])]),
|
||||
Errors.RebalanceInProgressError, False, True),
|
||||
(OffsetCommitResponse([('foobar', [(0, 17), (1, 17)])]),
|
||||
Errors.InvalidTopicError, False, False),
|
||||
(OffsetCommitResponse([('foobar', [(0, 29), (1, 29)])]),
|
||||
Errors.TopicAuthorizationFailedError, False, False),
|
||||
])
|
||||
def test_handle_offset_commit_response(patched_coord, offsets,
|
||||
response, error, dead, reassign):
|
||||
future = Future()
|
||||
patched_coord._handle_offset_commit_response(offsets, future, response)
|
||||
assert isinstance(future.exception, error)
|
||||
assert patched_coord.coordinator_id is (None if dead else 0)
|
||||
assert patched_coord._subscription.needs_partition_assignment is reassign
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def partitions():
|
||||
return [TopicPartition('foobar', 0), TopicPartition('foobar', 1)]
|
||||
|
||||
|
||||
def test_send_offset_fetch_request_fail(patched_coord, partitions):
|
||||
patched_coord.coordinator_unknown.return_value = True
|
||||
patched_coord.coordinator_id = None
|
||||
|
||||
# No partitions
|
||||
ret = patched_coord._send_offset_fetch_request([])
|
||||
assert isinstance(ret, Future)
|
||||
assert ret.succeeded()
|
||||
assert ret.value == {}
|
||||
|
||||
# No coordinator
|
||||
ret = patched_coord._send_offset_fetch_request(partitions)
|
||||
assert ret.failed()
|
||||
assert isinstance(ret.exception, Errors.GroupCoordinatorNotAvailableError)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('api_version,req_type', [
|
||||
((0, 8, 1), OffsetFetchRequest_v0),
|
||||
((0, 8, 2), OffsetFetchRequest_v1),
|
||||
((0, 9), OffsetFetchRequest_v1)])
|
||||
def test_send_offset_fetch_request_versions(patched_coord, partitions,
|
||||
api_version, req_type):
|
||||
# assuming fixture sets coordinator=0, least_loaded_node=1
|
||||
expect_node = 0 if api_version >= (0, 8, 2) else 1
|
||||
patched_coord.config['api_version'] = api_version
|
||||
|
||||
patched_coord._send_offset_fetch_request(partitions)
|
||||
(node, request), _ = patched_coord._client.send.call_args
|
||||
assert node == expect_node, 'Unexpected coordinator node'
|
||||
assert isinstance(request, req_type)
|
||||
|
||||
|
||||
def test_send_offset_fetch_request_failure(patched_coord, partitions):
|
||||
_f = Future()
|
||||
patched_coord._client.send.return_value = _f
|
||||
future = patched_coord._send_offset_fetch_request(partitions)
|
||||
(node, request), _ = patched_coord._client.send.call_args
|
||||
error = Exception()
|
||||
_f.failure(error)
|
||||
patched_coord._failed_request.assert_called_with(0, request, future, error)
|
||||
assert future.failed()
|
||||
assert future.exception is error
|
||||
|
||||
|
||||
def test_send_offset_fetch_request_success(patched_coord, partitions):
|
||||
_f = Future()
|
||||
patched_coord._client.send.return_value = _f
|
||||
future = patched_coord._send_offset_fetch_request(partitions)
|
||||
(node, request), _ = patched_coord._client.send.call_args
|
||||
response = OffsetFetchResponse([('foobar', [(0, 0), (1, 0)])])
|
||||
_f.success(response)
|
||||
patched_coord._handle_offset_fetch_response.assert_called_with(
|
||||
future, response)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('response,error,dead,reassign', [
|
||||
#(OffsetFetchResponse([('foobar', [(0, 123, b'', 30), (1, 234, b'', 30)])]),
|
||||
# Errors.GroupAuthorizationFailedError, False, False),
|
||||
#(OffsetFetchResponse([('foobar', [(0, 123, b'', 7), (1, 234, b'', 7)])]),
|
||||
# Errors.RequestTimedOutError, True, False),
|
||||
#(OffsetFetchResponse([('foobar', [(0, 123, b'', 27), (1, 234, b'', 27)])]),
|
||||
# Errors.RebalanceInProgressError, False, True),
|
||||
(OffsetFetchResponse([('foobar', [(0, 123, b'', 14), (1, 234, b'', 14)])]),
|
||||
Errors.GroupLoadInProgressError, False, False),
|
||||
(OffsetFetchResponse([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]),
|
||||
Errors.NotCoordinatorForGroupError, True, False),
|
||||
(OffsetFetchResponse([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]),
|
||||
Errors.UnknownMemberIdError, False, True),
|
||||
(OffsetFetchResponse([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]),
|
||||
Errors.IllegalGenerationError, False, True),
|
||||
(OffsetFetchResponse([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]),
|
||||
Errors.TopicAuthorizationFailedError, False, False),
|
||||
(OffsetFetchResponse([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]),
|
||||
None, False, False),
|
||||
])
|
||||
def test_handle_offset_fetch_response(patched_coord, offsets,
|
||||
response, error, dead, reassign):
|
||||
future = Future()
|
||||
patched_coord._handle_offset_fetch_response(future, response)
|
||||
if error is not None:
|
||||
assert isinstance(future.exception, error)
|
||||
else:
|
||||
assert future.succeeded()
|
||||
assert future.value == offsets
|
||||
assert patched_coord.coordinator_id is (None if dead else 0)
|
||||
assert patched_coord._subscription.needs_partition_assignment is reassign
|
||||
Reference in New Issue
Block a user