Init responses before we use it.
This commit is contained in:
@@ -161,6 +161,7 @@ class KafkaClient(object):
|
|||||||
brokers_for_payloads = []
|
brokers_for_payloads = []
|
||||||
payloads_by_broker = collections.defaultdict(list)
|
payloads_by_broker = collections.defaultdict(list)
|
||||||
|
|
||||||
|
responses = {}
|
||||||
for payload in payloads:
|
for payload in payloads:
|
||||||
try:
|
try:
|
||||||
leader = self._get_leader_for_partition(payload.topic,
|
leader = self._get_leader_for_partition(payload.topic,
|
||||||
@@ -175,7 +176,6 @@ class KafkaClient(object):
|
|||||||
|
|
||||||
# For each broker, send the list of request payloads
|
# For each broker, send the list of request payloads
|
||||||
# and collect the responses and errors
|
# and collect the responses and errors
|
||||||
responses = {}
|
|
||||||
broker_failures = []
|
broker_failures = []
|
||||||
for broker, payloads in payloads_by_broker.items():
|
for broker, payloads in payloads_by_broker.items():
|
||||||
requestId = self._next_id()
|
requestId = self._next_id()
|
||||||
|
|||||||
Reference in New Issue
Block a user