Add ClusterMetadata.add_group_coordinator()
This commit is contained in:
@@ -141,6 +141,45 @@ class ClusterMetadata(object):
|
||||
"""Remove a previously added listener callback"""
|
||||
self._listeners.remove(listener)
|
||||
|
||||
def add_group_coordinator(self, group, response):
|
||||
"""Update with metadata for a group coordinator
|
||||
|
||||
group: name of group from GroupCoordinatorRequest
|
||||
response: GroupCoordinatorResponse
|
||||
|
||||
returns True if metadata is updated, False on error
|
||||
"""
|
||||
log.debug("Updating coordinator for %s: %s", group, response)
|
||||
error_type = Errors.for_code(response.error_code)
|
||||
if error_type is not Errors.NoError:
|
||||
log.error("GroupCoordinatorResponse error: %s", error_type)
|
||||
self._groups[group] = -1
|
||||
return False
|
||||
|
||||
node_id = response.coordinator_id
|
||||
coordinator = BrokerMetadata(
|
||||
response.coordinator_id,
|
||||
response.host,
|
||||
response.port)
|
||||
|
||||
# Assume that group coordinators are just brokers
|
||||
# (this is true now, but could diverge in future)
|
||||
if node_id not in self._brokers:
|
||||
self._brokers[node_id] = coordinator
|
||||
|
||||
# If this happens, either brokers have moved without
|
||||
# changing IDs, or our assumption above is wrong
|
||||
elif coordinator != self._brokers[node_id]:
|
||||
log.error("GroupCoordinator metadata conflicts with existing"
|
||||
" broker metadata. Coordinator: %s, Broker: %s",
|
||||
coordinator, self._brokers[node_id])
|
||||
self._groups[group] = node_id
|
||||
return False
|
||||
|
||||
log.info("Group coordinator for %s is %s", group, coordinator)
|
||||
self._groups[group] = node_id
|
||||
return True
|
||||
|
||||
def __str__(self):
|
||||
return 'Cluster(brokers: %d, topics: %d, groups: %d)' % \
|
||||
(len(self._brokers), len(self._partitions), len(self._groups))
|
||||
|
||||
Reference in New Issue
Block a user