Improve consumer group test loop
This commit is contained in:
@@ -87,21 +87,21 @@ def test_group(kafka_broker, topic):
|
||||
elif not consumers[c].assignment():
|
||||
break
|
||||
|
||||
# Verify all consumers are in the same generation
|
||||
generations = set()
|
||||
for consumer in six.itervalues(consumers):
|
||||
generations.add(consumer._coordinator.generation)
|
||||
if len(generations) != 1:
|
||||
break
|
||||
|
||||
# If all checks passed, log state and break while loop
|
||||
# If all consumers exist and have an assignment
|
||||
else:
|
||||
for c in range(num_consumers):
|
||||
logging.info("[%s] %s %s: %s", c,
|
||||
consumers[c]._coordinator.generation,
|
||||
consumers[c]._coordinator.member_id,
|
||||
consumers[c].assignment())
|
||||
break
|
||||
|
||||
# Verify all consumers are in the same generation
|
||||
# then log state and break while loop
|
||||
generations = set([consumer._coordinator.generation
|
||||
for consumer in list(consumers.values())])
|
||||
|
||||
if len(generations) == 1:
|
||||
for c, consumer in list(consumers.items()):
|
||||
logging.info("[%s] %s %s: %s", c,
|
||||
consumer._coordinator.generation,
|
||||
consumer._coordinator.member_id,
|
||||
consumer.assignment())
|
||||
break
|
||||
assert time.time() < timeout, "timeout waiting for assignments"
|
||||
|
||||
group_assignment = set()
|
||||
|
||||
Reference in New Issue
Block a user