kafka: ensure topics are created

Change-Id: I3fb5ee1c134ca6ea220b204af8f6325eeadf9465
This commit is contained in:
Mehdi Abaakouk 2016-12-13 21:56:03 +01:00
parent c7cdf2d9b7
commit 578a186515
2 changed files with 11 additions and 0 deletions

View File

@ -276,6 +276,11 @@ class KafkaListener(base.PollStyleListener):
self.conn = conn
self.incoming_queue = []
# FIXME(sileht): We do a first poll to ensure we topics are created
# This is a workaround mainly for functional tests, in real life
# this is fine if topics are not created synchroneously
self.poll(5)
@base.batch_poll_helper
def poll(self, timeout=None):
while not self._stopped.is_set():

View File

@ -69,9 +69,15 @@ class ServerThreadHelper(threading.Thread):
self.daemon = True
self._server = server
self._stop_event = threading.Event()
self._start_event = threading.Event()
def start(self):
super(ServerThreadHelper, self).start()
self._start_event.wait()
def run(self):
self._server.start()
self._start_event.set()
self._stop_event.wait()
# Check start() does nothing with a running listener
self._server.start()