Fix concurrency bug in RecordAccumulator.ready()
This commit is contained in:
@@ -320,8 +320,11 @@ class RecordAccumulator(object):
|
|||||||
now = time.time()
|
now = time.time()
|
||||||
|
|
||||||
exhausted = bool(self._free.queued() > 0)
|
exhausted = bool(self._free.queued() > 0)
|
||||||
for tp, dq in six.iteritems(self._batches):
|
# several threads are accessing self._batches -- to simplify
|
||||||
|
# concurrent access, we iterate over a snapshot of partitions
|
||||||
|
# and lock each partition separately as needed
|
||||||
|
partitions = list(self._batches.keys())
|
||||||
|
for tp in partitions:
|
||||||
leader = cluster.leader_for_partition(tp)
|
leader = cluster.leader_for_partition(tp)
|
||||||
if leader is None or leader == -1:
|
if leader is None or leader == -1:
|
||||||
unknown_leaders_exist = True
|
unknown_leaders_exist = True
|
||||||
@@ -330,6 +333,7 @@ class RecordAccumulator(object):
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
with self._tp_locks[tp]:
|
with self._tp_locks[tp]:
|
||||||
|
dq = self._batches[tp]
|
||||||
if not dq:
|
if not dq:
|
||||||
continue
|
continue
|
||||||
batch = dq[0]
|
batch = dq[0]
|
||||||
|
|||||||
Reference in New Issue
Block a user