Merge pull request #403 from dpkp/client_request_response_ordering
Client request response ordering
This commit is contained in:
		| @@ -138,7 +138,8 @@ class KafkaClient(object): | ||||
|         Arguments: | ||||
|  | ||||
|         payloads: list of object-like entities with a topic (str) and | ||||
|             partition (int) attribute | ||||
|             partition (int) attribute; payloads with duplicate topic-partitions | ||||
|             are not supported. | ||||
|  | ||||
|         encode_fn: a method to encode the list of payloads to a request body, | ||||
|             must accept client_id, correlation_id, and payloads as | ||||
| @@ -152,6 +153,10 @@ class KafkaClient(object): | ||||
|  | ||||
|         List of response objects in the same order as the supplied payloads | ||||
|         """ | ||||
|         # encoders / decoders do not maintain ordering currently | ||||
|         # so we need to keep this so we can rebuild order before returning | ||||
|         original_ordering = [(p.topic, p.partition) for p in payloads] | ||||
|  | ||||
|         # Group the requests by topic+partition | ||||
|         brokers_for_payloads = [] | ||||
|         payloads_by_broker = collections.defaultdict(list) | ||||
| @@ -165,7 +170,7 @@ class KafkaClient(object): | ||||
|  | ||||
|         # For each broker, send the list of request payloads | ||||
|         # and collect the responses and errors | ||||
|         responses_by_broker = collections.defaultdict(list) | ||||
|         responses = {} | ||||
|         broker_failures = [] | ||||
|         for broker, payloads in payloads_by_broker.items(): | ||||
|             requestId = self._next_id() | ||||
| @@ -184,7 +189,8 @@ class KafkaClient(object): | ||||
|                             'to server %s: %s', requestId, broker, e) | ||||
|  | ||||
|                 for payload in payloads: | ||||
|                     responses_by_broker[broker].append(FailedPayloadsError(payload)) | ||||
|                     topic_partition = (payload.topic, payload.partition) | ||||
|                     responses[topic_partition] = FailedPayloadsError(payload) | ||||
|  | ||||
|             # No exception, try to get response | ||||
|             else: | ||||
| @@ -196,7 +202,8 @@ class KafkaClient(object): | ||||
|                     log.debug('Request %s does not expect a response ' | ||||
|                               '(skipping conn.recv)', requestId) | ||||
|                     for payload in payloads: | ||||
|                         responses_by_broker[broker].append(None) | ||||
|                         topic_partition = (payload.topic, payload.partition) | ||||
|                         responses[topic_partition] = None | ||||
|                     continue | ||||
|  | ||||
|                 try: | ||||
| @@ -208,12 +215,17 @@ class KafkaClient(object): | ||||
|                                 requestId, broker, e) | ||||
|  | ||||
|                     for payload in payloads: | ||||
|                         responses_by_broker[broker].append(FailedPayloadsError(payload)) | ||||
|                         topic_partition = (payload.topic, payload.partition) | ||||
|                         responses[topic_partition] = FailedPayloadsError(payload) | ||||
|  | ||||
|                 else: | ||||
|                     _resps = [] | ||||
|                     for payload_response in decoder_fn(response): | ||||
|                         responses_by_broker[broker].append(payload_response) | ||||
|                     log.debug('Response %s: %s', requestId, responses_by_broker[broker]) | ||||
|                         topic_partition = (payload_response.topic, | ||||
|                                            payload_response.partition) | ||||
|                         responses[topic_partition] = payload_response | ||||
|                         _resps.append(payload_response) | ||||
|                     log.debug('Response %s: %s', requestId, _resps) | ||||
|  | ||||
|         # Connection errors generally mean stale metadata | ||||
|         # although sometimes it means incorrect api request | ||||
| @@ -223,9 +235,7 @@ class KafkaClient(object): | ||||
|             self.reset_all_metadata() | ||||
|  | ||||
|         # Return responses in the same order as provided | ||||
|         responses_by_payload = [responses_by_broker[broker].pop(0) | ||||
|                                 for broker in brokers_for_payloads] | ||||
|         return responses_by_payload | ||||
|         return [responses[tp] for tp in original_ordering] | ||||
|  | ||||
|     def __repr__(self): | ||||
|         return '<KafkaClient client_id=%s>' % (self.client_id) | ||||
|   | ||||
| @@ -2,8 +2,9 @@ import os | ||||
|  | ||||
| from kafka.common import ( | ||||
|     FetchRequest, OffsetCommitRequest, OffsetFetchRequest, | ||||
|     KafkaTimeoutError | ||||
|     KafkaTimeoutError, ProduceRequest | ||||
| ) | ||||
| from kafka.protocol import create_message | ||||
|  | ||||
| from test.fixtures import ZookeeperFixture, KafkaFixture | ||||
| from test.testutil import KafkaIntegrationTestCase, kafka_versions | ||||
| @@ -49,6 +50,35 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase): | ||||
|         with self.assertRaises(KafkaTimeoutError): | ||||
|             self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0) | ||||
|  | ||||
|     @kafka_versions('all') | ||||
|     def test_send_produce_request_maintains_request_response_order(self): | ||||
|  | ||||
|         self.client.ensure_topic_exists(b'foo', timeout=1) | ||||
|         self.client.ensure_topic_exists(b'bar', timeout=1) | ||||
|  | ||||
|         requests = [ | ||||
|             ProduceRequest( | ||||
|                 b'foo', 0, | ||||
|                 [create_message(b'a'), create_message(b'b')]), | ||||
|             ProduceRequest( | ||||
|                 b'bar', 1, | ||||
|                 [create_message(b'a'), create_message(b'b')]), | ||||
|             ProduceRequest( | ||||
|                 b'foo', 1, | ||||
|                 [create_message(b'a'), create_message(b'b')]), | ||||
|             ProduceRequest( | ||||
|                 b'bar', 0, | ||||
|                 [create_message(b'a'), create_message(b'b')]), | ||||
|         ] | ||||
|  | ||||
|         responses = self.client.send_produce_request(requests) | ||||
|         while len(responses): | ||||
|             request = requests.pop() | ||||
|             response = responses.pop() | ||||
|             self.assertEqual(request.topic, response.topic) | ||||
|             self.assertEqual(request.partition, response.partition) | ||||
|  | ||||
|  | ||||
|     #################### | ||||
|     #   Offset Tests   # | ||||
|     #################### | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Dana Powers
					Dana Powers