Fix fetch_max_bytes=1 consumer integration test
This commit is contained in:
@@ -604,20 +604,20 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
|||||||
self.send_messages(0, range(100, 200))
|
self.send_messages(0, range(100, 200))
|
||||||
|
|
||||||
# Start a consumer. FetchResponse_v3 should always include at least 1
|
# Start a consumer. FetchResponse_v3 should always include at least 1
|
||||||
# full msg, so by setting fetch_max_bytes=1 we must get 1 msg at a time
|
# full msg, so by setting fetch_max_bytes=1 we should get 1 msg at a time
|
||||||
|
# But 0.11.0.0 returns 1 MessageSet at a time when the messages are
|
||||||
|
# stored in the new v2 format by the broker.
|
||||||
|
#
|
||||||
|
# DP Note: This is a strange test. The consumer shouldn't care
|
||||||
|
# how many messages are included in a FetchResponse, as long as it is
|
||||||
|
# non-zero. I would not mind if we deleted this test. It caused
|
||||||
|
# a minor headache when testing 0.11.0.0.
|
||||||
group = 'test-kafka-consumer-max-bytes-one-msg-' + random_string(5)
|
group = 'test-kafka-consumer-max-bytes-one-msg-' + random_string(5)
|
||||||
consumer = self.kafka_consumer(
|
consumer = self.kafka_consumer(
|
||||||
group_id=group,
|
group_id=group,
|
||||||
auto_offset_reset='earliest',
|
auto_offset_reset='earliest',
|
||||||
|
consumer_timeout_ms=5000,
|
||||||
fetch_max_bytes=1)
|
fetch_max_bytes=1)
|
||||||
fetched_msgs = []
|
|
||||||
# A bit hacky, but we need this in order for message count to be exact
|
|
||||||
consumer._coordinator.ensure_active_group()
|
|
||||||
for i in range(10):
|
|
||||||
poll_res = consumer.poll(timeout_ms=2000)
|
|
||||||
print(poll_res)
|
|
||||||
for partition, msgs in six.iteritems(poll_res):
|
|
||||||
for msg in msgs:
|
|
||||||
fetched_msgs.append(msg)
|
|
||||||
|
|
||||||
|
fetched_msgs = [next(consumer) for i in range(10)]
|
||||||
self.assertEqual(len(fetched_msgs), 10)
|
self.assertEqual(len(fetched_msgs), 10)
|
||||||
|
|||||||
Reference in New Issue
Block a user