Merge pull request #590 from dpkp/accumulator_bugfix
Fix producer threading bug that can crash sender
This commit is contained in:
@@ -248,11 +248,12 @@ class RecordAccumulator(object):
|
||||
expired_batches = []
|
||||
to_remove = []
|
||||
count = 0
|
||||
for tp, dq in six.iteritems(self._batches):
|
||||
for tp in list(self._batches.keys()):
|
||||
assert tp in self._tp_locks, 'TopicPartition not in locks dict'
|
||||
with self._tp_locks[tp]:
|
||||
# iterate over the batches and expire them if they have stayed
|
||||
# in accumulator for more than request_timeout_ms
|
||||
dq = self._batches[tp]
|
||||
for batch in dq:
|
||||
# check if the batch is expired
|
||||
if batch.maybe_expire(request_timeout_ms,
|
||||
@@ -367,8 +368,9 @@ class RecordAccumulator(object):
|
||||
|
||||
def has_unsent(self):
|
||||
"""Return whether there is any unsent record in the accumulator."""
|
||||
for tp, dq in six.iteritems(self._batches):
|
||||
for tp in list(self._batches.keys()):
|
||||
with self._tp_locks[tp]:
|
||||
dq = self._batches[tp]
|
||||
if len(dq):
|
||||
return True
|
||||
return False
|
||||
|
||||
Reference in New Issue
Block a user