join consumer threads in test_consumer_group cleanup
This commit is contained in:
@@ -52,6 +52,7 @@ def test_group(kafka_broker, topic):
|
|||||||
connect_str = 'localhost:' + str(kafka_broker.port)
|
connect_str = 'localhost:' + str(kafka_broker.port)
|
||||||
consumers = {}
|
consumers = {}
|
||||||
stop = {}
|
stop = {}
|
||||||
|
threads = {}
|
||||||
messages = collections.defaultdict(list)
|
messages = collections.defaultdict(list)
|
||||||
def consumer_thread(i):
|
def consumer_thread(i):
|
||||||
assert i not in consumers
|
assert i not in consumers
|
||||||
@@ -61,7 +62,7 @@ def test_group(kafka_broker, topic):
|
|||||||
bootstrap_servers=connect_str,
|
bootstrap_servers=connect_str,
|
||||||
heartbeat_interval_ms=500)
|
heartbeat_interval_ms=500)
|
||||||
while not stop[i].is_set():
|
while not stop[i].is_set():
|
||||||
for tp, records in six.itervalues(consumers[i].poll()):
|
for tp, records in six.itervalues(consumers[i].poll(100)):
|
||||||
messages[i][tp].extend(records)
|
messages[i][tp].extend(records)
|
||||||
consumers[i].close()
|
consumers[i].close()
|
||||||
del consumers[i]
|
del consumers[i]
|
||||||
@@ -70,8 +71,8 @@ def test_group(kafka_broker, topic):
|
|||||||
num_consumers = 4
|
num_consumers = 4
|
||||||
for i in range(num_consumers):
|
for i in range(num_consumers):
|
||||||
t = threading.Thread(target=consumer_thread, args=(i,))
|
t = threading.Thread(target=consumer_thread, args=(i,))
|
||||||
t.daemon = True
|
|
||||||
t.start()
|
t.start()
|
||||||
|
threads[i] = t
|
||||||
|
|
||||||
try:
|
try:
|
||||||
timeout = time.time() + 35
|
timeout = time.time() + 35
|
||||||
@@ -116,6 +117,7 @@ def test_group(kafka_broker, topic):
|
|||||||
finally:
|
finally:
|
||||||
for c in range(num_consumers):
|
for c in range(num_consumers):
|
||||||
stop[c].set()
|
stop[c].set()
|
||||||
|
threads[c].join()
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
|
|||||||
Reference in New Issue
Block a user