Remove test_correlation_id_rollover; use daemon threads for test consumers
This commit is contained in:
@@ -29,14 +29,6 @@ def topic(simple_client):
|
|||||||
return topic
|
return topic
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def topic_with_messages(simple_client, topic):
|
|
||||||
producer = SimpleProducer(simple_client)
|
|
||||||
for i in six.moves.xrange(100):
|
|
||||||
producer.send_messages(topic, 'msg_%d' % i)
|
|
||||||
return topic
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
|
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
|
||||||
def test_consumer(kafka_broker, version):
|
def test_consumer(kafka_broker, version):
|
||||||
|
|
||||||
@@ -76,7 +68,9 @@ def test_group(kafka_broker, topic):
|
|||||||
|
|
||||||
num_consumers = 4
|
num_consumers = 4
|
||||||
for i in range(num_consumers):
|
for i in range(num_consumers):
|
||||||
threading.Thread(target=consumer_thread, args=(i,)).start()
|
t = threading.Thread(target=consumer_thread, args=(i,))
|
||||||
|
t.daemon = True
|
||||||
|
t.start()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
timeout = time.time() + 35
|
timeout = time.time() + 35
|
||||||
@@ -108,38 +102,3 @@ def test_group(kafka_broker, topic):
|
|||||||
finally:
|
finally:
|
||||||
for c in range(num_consumers):
|
for c in range(num_consumers):
|
||||||
stop[c].set()
|
stop[c].set()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
|
|
||||||
def test_correlation_id_rollover(kafka_broker):
|
|
||||||
logging.getLogger('kafka.conn').setLevel(logging.ERROR)
|
|
||||||
from kafka.protocol.metadata import MetadataRequest
|
|
||||||
conn = BrokerConnection('localhost', kafka_broker.port,
|
|
||||||
receive_buffer_bytes=131072,
|
|
||||||
max_in_flight_requests_per_connection=100)
|
|
||||||
req = MetadataRequest([])
|
|
||||||
while not conn.connected():
|
|
||||||
conn.connect()
|
|
||||||
futures = collections.deque()
|
|
||||||
start = time.time()
|
|
||||||
done = 0
|
|
||||||
for i in six.moves.xrange(2**13):
|
|
||||||
if not conn.can_send_more():
|
|
||||||
conn.recv(timeout=None)
|
|
||||||
futures.append(conn.send(req))
|
|
||||||
conn.recv()
|
|
||||||
while futures and futures[0].is_done:
|
|
||||||
f = futures.popleft()
|
|
||||||
if not f.succeeded():
|
|
||||||
raise f.exception
|
|
||||||
done += 1
|
|
||||||
if time.time() > start + 10:
|
|
||||||
print ("%d done" % done)
|
|
||||||
start = time.time()
|
|
||||||
|
|
||||||
while futures:
|
|
||||||
conn.recv()
|
|
||||||
if futures[0].is_done:
|
|
||||||
f = futures.popleft()
|
|
||||||
if not f.succeeded():
|
|
||||||
raise f.exception
|
|
||||||
|
|||||||
Reference in New Issue
Block a user