KafkaClient.add_topic() -- for use by async producer
This commit is contained in:
@@ -483,6 +483,21 @@ class KafkaClient(object):
|
||||
self._topics = set(topics)
|
||||
return future
|
||||
|
||||
def add_topic(self, topic):
|
||||
"""Add a topic to the list of topics tracked via metadata.
|
||||
|
||||
Arguments:
|
||||
topic (str): topic to track
|
||||
|
||||
Returns:
|
||||
Future: resolves after metadata request/response
|
||||
"""
|
||||
if topic in self._topics:
|
||||
return Future().success(set(self._topics))
|
||||
|
||||
self._topics.add(topic)
|
||||
return self.cluster.request_update()
|
||||
|
||||
# request metadata update on disconnect and timedout
|
||||
def _maybe_refresh_metadata(self):
|
||||
"""Send a metadata request if needed.
|
||||
|
Reference in New Issue
Block a user