Producer.stop() now blocks until async thread completes (drop confusing timeout arg)
This commit is contained in:
@@ -415,17 +415,22 @@ class Producer(object):
|
|||||||
raise
|
raise
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
def stop(self, timeout=1):
|
def stop(self):
|
||||||
"""
|
"""
|
||||||
Stop the producer. Optionally wait for the specified timeout before
|
Stop the producer (async mode). Blocks until async thread completes.
|
||||||
forcefully cleaning up.
|
|
||||||
"""
|
"""
|
||||||
|
if not self.async:
|
||||||
|
log.warning("producer.stop() called, but producer is not async")
|
||||||
|
return
|
||||||
|
|
||||||
|
if self.stopped:
|
||||||
|
log.warning("producer.stop() called, but producer is already stopped")
|
||||||
|
return
|
||||||
|
|
||||||
if self.async:
|
if self.async:
|
||||||
self.queue.put((STOP_ASYNC_PRODUCER, None, None))
|
self.queue.put((STOP_ASYNC_PRODUCER, None, None))
|
||||||
self.thread.join(timeout)
|
|
||||||
|
|
||||||
if self.thread.is_alive():
|
|
||||||
self.thread_stop_event.set()
|
self.thread_stop_event.set()
|
||||||
|
self.thread.join()
|
||||||
|
|
||||||
if hasattr(self, '_cleanup_func'):
|
if hasattr(self, '_cleanup_func'):
|
||||||
# Remove cleanup handler now that we've stopped
|
# Remove cleanup handler now that we've stopped
|
||||||
|
|||||||
Reference in New Issue
Block a user