190 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			190 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from __future__ import absolute_import
 | |
| 
 | |
| import copy
 | |
| import logging
 | |
| import random
 | |
| import time
 | |
| 
 | |
| import kafka.common as Errors
 | |
| from kafka.common import BrokerMetadata
 | |
| from .future import Future
 | |
| 
 | |
| log = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| class ClusterMetadata(object):
 | |
|     DEFAULT_CONFIG = {
 | |
|         'retry_backoff_ms': 100,
 | |
|         'metadata_max_age_ms': 300000,
 | |
|     }
 | |
| 
 | |
|     def __init__(self, **configs):
 | |
|         self._brokers = {}
 | |
|         self._partitions = {}
 | |
|         self._groups = {}
 | |
|         self._version = 0
 | |
|         self._last_refresh_ms = 0
 | |
|         self._last_successful_refresh_ms = 0
 | |
|         self._need_update = False
 | |
|         self._future = None
 | |
|         self._listeners = set()
 | |
| 
 | |
|         self.config = copy.copy(self.DEFAULT_CONFIG)
 | |
|         for key in self.config:
 | |
|             if key in configs:
 | |
|                 self.config[key] = configs[key]
 | |
| 
 | |
|     def brokers(self):
 | |
|         return set(self._brokers.values())
 | |
| 
 | |
|     def broker_metadata(self, broker_id):
 | |
|         return self._brokers.get(broker_id)
 | |
| 
 | |
|     def partitions_for_topic(self, topic):
 | |
|         if topic not in self._partitions:
 | |
|             return None
 | |
|         return set(self._partitions[topic].keys())
 | |
| 
 | |
|     def leader_for_partition(self, partition):
 | |
|         if partition.topic not in self._partitions:
 | |
|             return None
 | |
|         return self._partitions[partition.topic].get(partition.partition)
 | |
| 
 | |
|     def coordinator_for_group(self, group):
 | |
|         return self._groups.get(group)
 | |
| 
 | |
|     def ttl(self):
 | |
|         """Milliseconds until metadata should be refreshed"""
 | |
|         now = time.time() * 1000
 | |
|         if self._need_update:
 | |
|             ttl = 0
 | |
|         else:
 | |
|             ttl = self._last_successful_refresh_ms + self.config['metadata_max_age_ms'] - now
 | |
|         retry = self._last_refresh_ms + self.config['retry_backoff_ms'] - now
 | |
|         return max(ttl, retry, 0)
 | |
| 
 | |
|     def request_update(self):
 | |
|         """
 | |
|         Flags metadata for update, return Future()
 | |
| 
 | |
|         Actual update must be handled separately. This method will only
 | |
|         change the reported ttl()
 | |
|         """
 | |
|         self._need_update = True
 | |
|         if not self._future or self._future.is_done:
 | |
|           self._future = Future()
 | |
|         return self._future
 | |
| 
 | |
|     def topics(self):
 | |
|         return set(self._partitions.keys())
 | |
| 
 | |
|     def failed_update(self, exception):
 | |
|         if self._future:
 | |
|             self._future.failure(exception)
 | |
|             self._future = None
 | |
|         self._last_refresh_ms = time.time() * 1000
 | |
| 
 | |
|     def update_metadata(self, metadata):
 | |
|         # In the common case where we ask for a single topic and get back an
 | |
|         # error, we should fail the future
 | |
|         if len(metadata.topics) == 1 and metadata.topics[0][0] != 0:
 | |
|             error_code, topic, _ = metadata.topics[0]
 | |
|             error = Errors.for_code(error_code)(topic)
 | |
|             return self.failed_update(error)
 | |
| 
 | |
|         if not metadata.brokers:
 | |
|             log.warning("No broker metadata found in MetadataResponse")
 | |
| 
 | |
|         for node_id, host, port in metadata.brokers:
 | |
|             self._brokers.update({
 | |
|                 node_id: BrokerMetadata(node_id, host, port)
 | |
|             })
 | |
| 
 | |
|         # Drop any UnknownTopic, InvalidTopic, and TopicAuthorizationFailed
 | |
|         # but retain LeaderNotAvailable because it means topic is initializing
 | |
|         self._partitions = {}
 | |
| 
 | |
|         for error_code, topic, partitions in metadata.topics:
 | |
|             error_type = Errors.for_code(error_code)
 | |
|             if error_type is Errors.NoError:
 | |
|                 self._partitions[topic] = {}
 | |
|                 for _, partition, leader, _, _ in partitions:
 | |
|                     self._partitions[topic][partition] = leader
 | |
|             elif error_type is Errors.LeaderNotAvailableError:
 | |
|                 log.error("Topic %s is not available during auto-create"
 | |
|                           " initialization", topic)
 | |
|             elif error_type is Errors.UnknownTopicOrPartitionError:
 | |
|                 log.error("Topic %s not found in cluster metadata", topic)
 | |
|             elif error_type is Errors.TopicAuthorizationFailedError:
 | |
|                 log.error("Topic %s is not authorized for this client", topic)
 | |
|             elif error_type is Errors.InvalidTopicError:
 | |
|                 log.error("'%s' is not a valid topic name", topic)
 | |
|             else:
 | |
|                 log.error("Error fetching metadata for topic %s: %s",
 | |
|                           topic, error_type)
 | |
| 
 | |
|         if self._future:
 | |
|             self._future.success(self)
 | |
|         self._future = None
 | |
|         self._need_update = False
 | |
|         self._version += 1
 | |
|         now = time.time() * 1000
 | |
|         self._last_refresh_ms = now
 | |
|         self._last_successful_refresh_ms = now
 | |
|         log.debug("Updated cluster metadata version %d to %s",
 | |
|                   self._version, self)
 | |
| 
 | |
|         for listener in self._listeners:
 | |
|             listener(self)
 | |
| 
 | |
|     def add_listener(self, listener):
 | |
|         """Add a callback function to be called on each metadata update"""
 | |
|         self._listeners.add(listener)
 | |
| 
 | |
|     def remove_listener(self, listener):
 | |
|         """Remove a previously added listener callback"""
 | |
|         self._listeners.remove(listener)
 | |
| 
 | |
|     def add_group_coordinator(self, group, response):
 | |
|         """Update with metadata for a group coordinator
 | |
| 
 | |
|         group: name of group from GroupCoordinatorRequest
 | |
|         response: GroupCoordinatorResponse
 | |
| 
 | |
|         returns True if metadata is updated, False on error
 | |
|         """
 | |
|         log.debug("Updating coordinator for %s: %s", group, response)
 | |
|         error_type = Errors.for_code(response.error_code)
 | |
|         if error_type is not Errors.NoError:
 | |
|             log.error("GroupCoordinatorResponse error: %s", error_type)
 | |
|             self._groups[group] = -1
 | |
|             return False
 | |
| 
 | |
|         node_id = response.coordinator_id
 | |
|         coordinator = BrokerMetadata(
 | |
|             response.coordinator_id,
 | |
|             response.host,
 | |
|             response.port)
 | |
| 
 | |
|         # Assume that group coordinators are just brokers
 | |
|         # (this is true now, but could diverge in future)
 | |
|         if node_id not in self._brokers:
 | |
|             self._brokers[node_id] = coordinator
 | |
| 
 | |
|         # If this happens, either brokers have moved without
 | |
|         # changing IDs, or our assumption above is wrong
 | |
|         elif coordinator != self._brokers[node_id]:
 | |
|             log.error("GroupCoordinator metadata conflicts with existing"
 | |
|                       " broker metadata. Coordinator: %s, Broker: %s",
 | |
|                       coordinator, self._brokers[node_id])
 | |
|             self._groups[group] = node_id
 | |
|             return False
 | |
| 
 | |
|         log.info("Group coordinator for %s is %s", group, coordinator)
 | |
|         self._groups[group] = node_id
 | |
|         return True
 | |
| 
 | |
|     def __str__(self):
 | |
|         return 'Cluster(brokers: %d, topics: %d, groups: %d)' % \
 | |
|                (len(self._brokers), len(self._partitions), len(self._groups))
 | 
