Merge "Always complete cache init on reconnect"
This commit is contained in:
commit
81832a12bb
|
@ -763,23 +763,22 @@ class NodepoolTreeCache(abc.ABC):
|
||||||
self._event_queue.put(None)
|
self._event_queue.put(None)
|
||||||
self._playback_queue.put(None)
|
self._playback_queue.put(None)
|
||||||
elif state == KazooState.CONNECTED and not self._stopped:
|
elif state == KazooState.CONNECTED and not self._stopped:
|
||||||
self._ready.clear()
|
|
||||||
self._stop_workers = True
|
|
||||||
self._event_queue.put(None)
|
|
||||||
self._playback_queue.put(None)
|
|
||||||
self.zk.kazoo_client.handler.short_spawn(self._start)
|
self.zk.kazoo_client.handler.short_spawn(self._start)
|
||||||
|
|
||||||
def _cacheListener(self, event):
|
def _cacheListener(self, event):
|
||||||
self._event_queue.put(event)
|
self._event_queue.put(event)
|
||||||
|
|
||||||
def _start(self):
|
def _start(self):
|
||||||
locked = self._init_lock.acquire(blocking=False)
|
with self._init_lock:
|
||||||
if locked:
|
|
||||||
self.log.debug("Initialize cache at %s", self.root)
|
self.log.debug("Initialize cache at %s", self.root)
|
||||||
|
|
||||||
# If we have an event worker (this is a re-init), then way
|
self._ready.clear()
|
||||||
# for it to finish stopping (the session listener should
|
self._stop_workers = True
|
||||||
# have told it to stop).
|
self._event_queue.put(None)
|
||||||
|
self._playback_queue.put(None)
|
||||||
|
|
||||||
|
# If we have an event worker (this is a re-init), then wait
|
||||||
|
# for it to finish stopping.
|
||||||
if self._event_worker:
|
if self._event_worker:
|
||||||
self._event_worker.join()
|
self._event_worker.join()
|
||||||
# Replace the queue since any events from the previous
|
# Replace the queue since any events from the previous
|
||||||
|
@ -814,11 +813,6 @@ class NodepoolTreeCache(abc.ABC):
|
||||||
except Exception:
|
except Exception:
|
||||||
self.log.exception("Error initializing cache at %s", self.root)
|
self.log.exception("Error initializing cache at %s", self.root)
|
||||||
self.zk.kazoo_client.handler.short_spawn(self._start)
|
self.zk.kazoo_client.handler.short_spawn(self._start)
|
||||||
finally:
|
|
||||||
self._init_lock.release()
|
|
||||||
else:
|
|
||||||
self.log.debug("Skipping locked cache initialization at %s",
|
|
||||||
self.root)
|
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self._stopped = True
|
self._stopped = True
|
||||||
|
|
Loading…
Reference in New Issue