Merge pull request #485 from dpkp/async_producer_stop

Producer.stop() should block until async thread completes
This commit is contained in:
Dana Powers
2015-12-05 18:26:07 -08:00
3 changed files with 15 additions and 16 deletions

View File

@@ -607,11 +607,7 @@ class KafkaClient(object):
else:
decoder = KafkaProtocol.decode_produce_response
try:
resps = self._send_broker_aware_request(payloads, encoder, decoder)
except Exception:
if fail_on_error:
raise
resps = self._send_broker_aware_request(payloads, encoder, decoder)
return [resp if not callback else callback(resp) for resp in resps
if resp is not None and

View File

@@ -415,17 +415,22 @@ class Producer(object):
raise
return resp
def stop(self, timeout=1):
def stop(self):
"""
Stop the producer. Optionally wait for the specified timeout before
forcefully cleaning up.
Stop the producer (async mode). Blocks until async thread completes.
"""
if not self.async:
log.warning("producer.stop() called, but producer is not async")
return
if self.stopped:
log.warning("producer.stop() called, but producer is already stopped")
return
if self.async:
self.queue.put((STOP_ASYNC_PRODUCER, None, None))
self.thread.join(timeout)
if self.thread.is_alive():
self.thread_stop_event.set()
self.thread_stop_event.set()
self.thread.join()
if hasattr(self, '_cleanup_func'):
# Remove cleanup handler now that we've stopped

View File

@@ -204,13 +204,11 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
resp = producer.send_messages(self.topic, self.msg("one"))
self.assertEqual(len(resp), 0)
# wait for the server to report a new highwatermark
while self.current_offset(self.topic, partition) == start_offset:
time.sleep(0.1)
# flush messages
producer.stop()
self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
producer.stop()
@kafka_versions("all")
def test_batched_simple_producer__triggers_by_message(self):