Make sure all consumers are in same generation before stopping group test
This commit is contained in:
@@ -76,10 +76,23 @@ def test_group(kafka_broker, topic):
|
||||
timeout = time.time() + 35
|
||||
while True:
|
||||
for c in range(num_consumers):
|
||||
|
||||
# Verify all consumers have been created
|
||||
if c not in consumers:
|
||||
break
|
||||
|
||||
# Verify all consumers have an assignment
|
||||
elif not consumers[c].assignment():
|
||||
break
|
||||
|
||||
# Verify all consumers are in the same generation
|
||||
generations = set()
|
||||
for consumer in six.itervalues(consumers):
|
||||
generations.add(consumer._coordinator.generation)
|
||||
if len(generations) != 1:
|
||||
break
|
||||
|
||||
# If all checks passed, log state and break while loop
|
||||
else:
|
||||
for c in range(num_consumers):
|
||||
logging.info("[%s] %s %s: %s", c,
|
||||
|
||||
Reference in New Issue
Block a user