KAFKA-3197: when max.in.flight.request.per.connection = 1, attempt to guarantee ordering (#698)

This commit is contained in:
Dana Powers
2016-05-22 19:21:35 -07:00
parent 42293725e5
commit b000303045
3 changed files with 33 additions and 11 deletions

View File

@@ -286,7 +286,9 @@ class KafkaProducer(object):
message_version = 1 if self.config['api_version'] >= (0, 10) else 0 message_version = 1 if self.config['api_version'] >= (0, 10) else 0
self._accumulator = RecordAccumulator(message_version=message_version, **self.config) self._accumulator = RecordAccumulator(message_version=message_version, **self.config)
self._metadata = client.cluster self._metadata = client.cluster
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
self._sender = Sender(client, self._metadata, self._accumulator, self._sender = Sender(client, self._metadata, self._accumulator,
guarantee_message_order=guarantee_message_order,
**self.config) **self.config)
self._sender.daemon = True self._sender.daemon = True
self._sender.start() self._sender.start()

View File

@@ -150,7 +150,6 @@ class RecordAccumulator(object):
self.config[key] = configs.pop(key) self.config[key] = configs.pop(key)
self._closed = False self._closed = False
self._drain_index = 0
self._flushes_in_progress = AtomicInteger() self._flushes_in_progress = AtomicInteger()
self._appends_in_progress = AtomicInteger() self._appends_in_progress = AtomicInteger()
self._batches = collections.defaultdict(collections.deque) # TopicPartition: [RecordBatch] self._batches = collections.defaultdict(collections.deque) # TopicPartition: [RecordBatch]
@@ -158,6 +157,10 @@ class RecordAccumulator(object):
self._free = SimpleBufferPool(self.config['buffer_memory'], self._free = SimpleBufferPool(self.config['buffer_memory'],
self.config['batch_size']) self.config['batch_size'])
self._incomplete = IncompleteRecordBatches() self._incomplete = IncompleteRecordBatches()
# The following variables should only be accessed by the sender thread,
# so we don't need to protect them w/ locking.
self.muted = set()
self._drain_index = 0
def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms): def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms):
"""Add a record to the accumulator, return the append result. """Add a record to the accumulator, return the append result.
@@ -304,16 +307,20 @@ class RecordAccumulator(object):
Also return the flag for whether there are any unknown leaders for the Also return the flag for whether there are any unknown leaders for the
accumulated partition batches. accumulated partition batches.
A destination node is ready to send data if ANY one of its partition is A destination node is ready to send if:
not backing off the send and ANY of the following are true:
* The record set is full * There is at least one partition that is not backing off its send
* The record set has sat in the accumulator for at least linger_ms * and those partitions are not muted (to prevent reordering if
milliseconds max_in_flight_connections is set to 1)
* The accumulator is out of memory and threads are blocking waiting * and any of the following are true:
for data (in this case all partitions are immediately considered
ready). * The record set is full
* The accumulator has been closed * The record set has sat in the accumulator for at least linger_ms
milliseconds
* The accumulator is out of memory and threads are blocking waiting
for data (in this case all partitions are immediately considered
ready).
* The accumulator has been closed
Arguments: Arguments:
cluster (ClusterMetadata): cluster (ClusterMetadata):
@@ -341,6 +348,8 @@ class RecordAccumulator(object):
continue continue
elif leader in ready_nodes: elif leader in ready_nodes:
continue continue
elif tp in self.muted:
continue
with self._tp_locks[tp]: with self._tp_locks[tp]:
dq = self._batches[tp] dq = self._batches[tp]
@@ -410,7 +419,7 @@ class RecordAccumulator(object):
start = self._drain_index start = self._drain_index
while True: while True:
tp = partitions[self._drain_index] tp = partitions[self._drain_index]
if tp in self._batches: if tp in self._batches and tp not in self.muted:
with self._tp_locks[tp]: with self._tp_locks[tp]:
dq = self._batches[tp] dq = self._batches[tp]
if dq: if dq:

View File

@@ -26,6 +26,7 @@ class Sender(threading.Thread):
'acks': 1, 'acks': 1,
'retries': 0, 'retries': 0,
'request_timeout_ms': 30000, 'request_timeout_ms': 30000,
'guarantee_message_order': False,
'client_id': 'kafka-python-' + __version__, 'client_id': 'kafka-python-' + __version__,
'api_version': (0, 8, 0), 'api_version': (0, 8, 0),
} }
@@ -110,6 +111,12 @@ class Sender(threading.Thread):
batches_by_node = self._accumulator.drain( batches_by_node = self._accumulator.drain(
self._metadata, ready_nodes, self.config['max_request_size']) self._metadata, ready_nodes, self.config['max_request_size'])
if self.config['guarantee_message_order']:
# Mute all the partitions drained
for batch_list in six.itervalues(batches_by_node):
for batch in batch_list:
self._accumulator.muted.add(batch.topic_partition)
expired_batches = self._accumulator.abort_expired_batches( expired_batches = self._accumulator.abort_expired_batches(
self.config['request_timeout_ms'], self._metadata) self.config['request_timeout_ms'], self._metadata)
@@ -222,6 +229,10 @@ class Sender(threading.Thread):
if getattr(error, 'invalid_metadata', False): if getattr(error, 'invalid_metadata', False):
self._metadata.request_update() self._metadata.request_update()
# Unmute the completed partition.
if self.config['guarantee_message_order']:
self._accumulator.muted.remove(batch.topic_partition)
def _can_retry(self, batch, error): def _can_retry(self, batch, error):
""" """
We can retry a send if the error is transient and the number of We can retry a send if the error is transient and the number of