ConsumerCoordinator cleanups

- default assignors to RoundRobinPartitionAssignor
  - check offsets types in commit_offsets_* methods
  - succeed future in _send_offset_commit_request when no offsets
  - raise exception if no subscribed topics in group_protocols()
  - fix _subscription typo in metadata listener callbacks
  - short circuit if no partitions passed to fetch_committed_offsets
  - line-wrap comments
  - return future from commit_offsets_async
  - return future value from commit_offsets_sync
  - fix self._failed_request callback partial args
  - comment out metrics class for now
This commit is contained in:
Dana Powers
2016-01-10 00:25:54 -08:00
parent 35ed2e75da
commit bbd6444e85
2 changed files with 36 additions and 13 deletions

View File

@@ -621,7 +621,7 @@ class HeartbeatTask(object):
etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0 etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0
self._client.schedule(self, etd) self._client.schedule(self, etd)
'''
class GroupCoordinatorMetrics(object): class GroupCoordinatorMetrics(object):
def __init__(self, metrics, prefix, tags=None): def __init__(self, metrics, prefix, tags=None):
self.metrics = metrics self.metrics = metrics
@@ -674,5 +674,4 @@ class GroupCoordinatorMetrics(object):
"The number of seconds since the last controller heartbeat", "The number of seconds since the last controller heartbeat",
tags), lastHeartbeat) tags), lastHeartbeat)
""" """
'''

View File

@@ -8,6 +8,7 @@ import time
import six import six
from .base import BaseCoordinator from .base import BaseCoordinator
from .assignors.roundrobin import RoundRobinPartitionAssignor
from .protocol import ( from .protocol import (
ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment,
ConsumerProtocol) ConsumerProtocol)
@@ -29,7 +30,7 @@ class ConsumerCoordinator(BaseCoordinator):
'enable_auto_commit': True, 'enable_auto_commit': True,
'auto_commit_interval_ms': 5000, 'auto_commit_interval_ms': 5000,
'default_offset_commit_callback': lambda offsets, response: True, 'default_offset_commit_callback': lambda offsets, response: True,
'assignors': (), 'assignors': (RoundRobinPartitionAssignor,),
'session_timeout_ms': 30000, 'session_timeout_ms': 30000,
'heartbeat_interval_ms': 3000, 'heartbeat_interval_ms': 3000,
'retry_backoff_ms': 100, 'retry_backoff_ms': 100,
@@ -100,6 +101,7 @@ class ConsumerCoordinator(BaseCoordinator):
def group_protocols(self): def group_protocols(self):
"""Returns list of preferred (protocols, metadata)""" """Returns list of preferred (protocols, metadata)"""
topics = self._subscription.subscription topics = self._subscription.subscription
assert topics is not None, 'Consumer has not subscribed to topics'
metadata_list = [] metadata_list = []
for assignor in self.config['assignors']: for assignor in self.config['assignors']:
metadata = assignor.metadata(topics) metadata = assignor.metadata(topics)
@@ -111,7 +113,7 @@ class ConsumerCoordinator(BaseCoordinator):
# if we encounter any unauthorized topics, raise an exception # if we encounter any unauthorized topics, raise an exception
# TODO # TODO
#if self._cluster.unauthorized_topics: #if self._cluster.unauthorized_topics:
# raise Errors.TopicAuthorizationError(self._cluster.unauthorized_topics) # raise TopicAuthorizationError(self._cluster.unauthorized_topics)
if self._subscription.subscribed_pattern: if self._subscription.subscribed_pattern:
topics = [] topics = []
@@ -122,7 +124,8 @@ class ConsumerCoordinator(BaseCoordinator):
self._subscription.change_subscription(topics) self._subscription.change_subscription(topics)
self._client.set_topics(self._subscription.group_subscription()) self._client.set_topics(self._subscription.group_subscription())
# check if there are any changes to the metadata which should trigger a rebalance # check if there are any changes to the metadata which should trigger
# a rebalance
if self._subscription_metadata_changed(): if self._subscription_metadata_changed():
if self.config['api_version'] >= (0, 9): if self.config['api_version'] >= (0, 9):
self._subscription.mark_for_reassignment() self._subscription.mark_for_reassignment()
@@ -182,7 +185,7 @@ class ConsumerCoordinator(BaseCoordinator):
# execute the user's callback after rebalance # execute the user's callback after rebalance
if self._subscription.listener: if self._subscription.listener:
try: try:
self._subscriptions.listener.on_partitions_assigned(assigned) self._subscription.listener.on_partitions_assigned(assigned)
except Exception: except Exception:
log.exception("User provided listener failed on partition" log.exception("User provided listener failed on partition"
" assignment: %s", assigned) " assignment: %s", assigned)
@@ -263,6 +266,9 @@ class ConsumerCoordinator(BaseCoordinator):
Returns: Returns:
dict: {TopicPartition: OffsetAndMetadata} dict: {TopicPartition: OffsetAndMetadata}
""" """
if not partitions:
return {}
while True: while True:
if self.config['api_version'] >= (0, 8, 2): if self.config['api_version'] >= (0, 8, 2):
self.ensure_coordinator_known() self.ensure_coordinator_known()
@@ -297,11 +303,16 @@ class ConsumerCoordinator(BaseCoordinator):
Returns: Returns:
Future: indicating whether the commit was successful or not Future: indicating whether the commit was successful or not
""" """
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
offsets.values()))
if callback is None: if callback is None:
callback = self.config['default_offset_commit_callback'] callback = self.config['default_offset_commit_callback']
self._subscription.needs_fetch_committed_offsets = True self._subscription.needs_fetch_committed_offsets = True
future = self._send_offset_commit_request(offsets) future = self._send_offset_commit_request(offsets)
future.add_both(callback, offsets) future.add_both(callback, offsets)
return future
def commit_offsets_sync(self, offsets): def commit_offsets_sync(self, offsets):
"""Commit specific offsets synchronously. """Commit specific offsets synchronously.
@@ -314,6 +325,10 @@ class ConsumerCoordinator(BaseCoordinator):
Raises error on failure Raises error on failure
""" """
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
offsets.values()))
if not offsets: if not offsets:
return return
@@ -325,7 +340,7 @@ class ConsumerCoordinator(BaseCoordinator):
self._client.poll(future=future) self._client.poll(future=future)
if future.succeeded(): if future.succeeded():
return return future.value
if not future.retriable(): if not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type raise future.exception # pylint: disable-msg=raising-bad-type
@@ -369,6 +384,13 @@ class ConsumerCoordinator(BaseCoordinator):
Returns: Returns:
Future: indicating whether the commit was successful or not Future: indicating whether the commit was successful or not
""" """
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
offsets.values()))
if not offsets:
return Future().success(None)
if self.config['api_version'] >= (0, 8, 2): if self.config['api_version'] >= (0, 8, 2):
if self.coordinator_unknown(): if self.coordinator_unknown():
return Future().failure(Errors.GroupCoordinatorNotAvailableError) return Future().failure(Errors.GroupCoordinatorNotAvailableError)
@@ -376,9 +398,6 @@ class ConsumerCoordinator(BaseCoordinator):
else: else:
node_id = self._client.least_loaded_node() node_id = self._client.least_loaded_node()
if not offsets:
return Future().failure(None)
# create the offset commit request # create the offset commit request
offset_data = collections.defaultdict(dict) offset_data = collections.defaultdict(dict)
for tp, offset in six.iteritems(offsets): for tp, offset in six.iteritems(offsets):
@@ -428,7 +447,7 @@ class ConsumerCoordinator(BaseCoordinator):
future = Future() future = Future()
_f = self._client.send(node_id, request) _f = self._client.send(node_id, request)
_f.add_callback(self._handle_offset_commit_response, offsets, future) _f.add_callback(self._handle_offset_commit_response, offsets, future)
_f.add_errback(self._failed_request, future) _f.add_errback(self._failed_request, node_id, request, future)
return future return future
def _handle_offset_commit_response(self, offsets, future, response): def _handle_offset_commit_response(self, offsets, future, response):
@@ -513,6 +532,11 @@ class ConsumerCoordinator(BaseCoordinator):
Returns: Returns:
Future: resolves to dict of offsets: {TopicPartition: int} Future: resolves to dict of offsets: {TopicPartition: int}
""" """
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
assert all(map(lambda k: isinstance(k, TopicPartition), partitions))
if not partitions:
return Future().success({})
if self.config['api_version'] >= (0, 8, 2): if self.config['api_version'] >= (0, 8, 2):
if self.coordinator_unknown(): if self.coordinator_unknown():
return Future().failure(Errors.GroupCoordinatorNotAvailableError) return Future().failure(Errors.GroupCoordinatorNotAvailableError)
@@ -541,7 +565,7 @@ class ConsumerCoordinator(BaseCoordinator):
future = Future() future = Future()
_f = self._client.send(node_id, request) _f = self._client.send(node_id, request)
_f.add_callback(self._handle_offset_fetch_response, future) _f.add_callback(self._handle_offset_fetch_response, future)
_f.add_errback(self._failed_request, future) _f.add_errback(self._failed_request, node_id, request, future)
return future return future
def _handle_offset_fetch_response(self, future, response): def _handle_offset_fetch_response(self, future, response):