Clear tree cache queues on disconnect
If a TreeCache encounters a ZK disconnect, then any events in its event queue are no longer useful and possibly counter-productive. If we process events from before the disconnection after a reconnection, the cache could get into an erroneous state. The cache is designed so that on initial or re-connection, we first establish the watch, and then walk the tree to look for previously existing data. All events we process (whether generated by the watch or by walking the tree) must be in order after the watch was established. To ensure this, we clear the event queues on reconnect and restart the event queue threads. Additionally, this change cleans up handling of the case where the cache key is None (the parser says we're not interested in the object) but the preCacheHook didn't tell us to exit early. In that case, we would previously most likely try to pop the None object from the cache, which would generally be harmless. But we shouldn't be doing that anyway, so add a check for whether we have an object key in the main cache method and ignore the return values from the preCacheHook. Change-Id: I4bbedb19364c894a2033ef8c1d2e439299971a83
This commit is contained in:
parent
173b59abbb
commit
25af998d64
@ -1416,8 +1416,9 @@ class TestTreeCache(tests.DBTestCase):
|
||||
break
|
||||
self.assertEqual(len(cached_node.lock_contenders), 1)
|
||||
|
||||
my_zk._node_cache._ready.clear()
|
||||
my_zk._node_cache._start()
|
||||
my_zk._node_cache._sessionListener(KazooState.LOST)
|
||||
my_zk._node_cache._sessionListener(KazooState.CONNECTED)
|
||||
my_zk._node_cache.ensureReady()
|
||||
my_zk._node_cache._event_queue.join()
|
||||
my_zk._node_cache._playback_queue.join()
|
||||
|
||||
|
@ -748,24 +748,25 @@ class NodepoolTreeCache(abc.ABC):
|
||||
self._ready = threading.Event()
|
||||
self._init_lock = threading.Lock()
|
||||
self._stopped = False
|
||||
self._stop_workers = False
|
||||
self._event_queue = queue.Queue()
|
||||
self._playback_queue = queue.Queue()
|
||||
self._event_worker = threading.Thread(
|
||||
target=self._eventWorker)
|
||||
self._event_worker.daemon = True
|
||||
self._event_worker.start()
|
||||
self._playback_worker = threading.Thread(
|
||||
target=self._playbackWorker)
|
||||
self._playback_worker.daemon = True
|
||||
self._playback_worker.start()
|
||||
self._event_worker = None
|
||||
self._playback_worker = None
|
||||
zk.kazoo_client.add_listener(self._sessionListener)
|
||||
self._start()
|
||||
|
||||
def _sessionListener(self, state):
|
||||
if state == KazooState.LOST:
|
||||
self._ready.clear()
|
||||
self._stop_workers = True
|
||||
self._event_queue.put(None)
|
||||
self._playback_queue.put(None)
|
||||
elif state == KazooState.CONNECTED and not self._stopped:
|
||||
self._ready.clear()
|
||||
self._stop_workers = True
|
||||
self._event_queue.put(None)
|
||||
self._playback_queue.put(None)
|
||||
self.zk.kazoo_client.handler.short_spawn(self._start)
|
||||
|
||||
def _cacheListener(self, event):
|
||||
@ -775,6 +776,34 @@ class NodepoolTreeCache(abc.ABC):
|
||||
locked = self._init_lock.acquire(blocking=False)
|
||||
if locked:
|
||||
self.log.debug("Initialize cache at %s", self.root)
|
||||
|
||||
# If we have an event worker (this is a re-init), then way
|
||||
# for it to finish stopping (the session listener should
|
||||
# have told it to stop).
|
||||
if self._event_worker:
|
||||
self._event_worker.join()
|
||||
# Replace the queue since any events from the previous
|
||||
# session aren't valid.
|
||||
self._event_queue = queue.Queue()
|
||||
# Prepare (but don't start) the new worker.
|
||||
self._event_worker = threading.Thread(
|
||||
target=self._eventWorker)
|
||||
self._event_worker.daemon = True
|
||||
|
||||
if self._playback_worker:
|
||||
self._playback_worker.join()
|
||||
self._playback_queue = queue.Queue()
|
||||
self._playback_worker = threading.Thread(
|
||||
target=self._playbackWorker)
|
||||
self._playback_worker.daemon = True
|
||||
|
||||
# Clear the stop flag and start the workers now that we
|
||||
# are sure that both have stopped and we have cleared the
|
||||
# queues.
|
||||
self._stop_workers = False
|
||||
self._event_worker.start()
|
||||
self._playback_worker.start()
|
||||
|
||||
try:
|
||||
self.zk.kazoo_client.add_watch(
|
||||
self.root, self._cacheListener,
|
||||
@ -832,7 +861,7 @@ class NodepoolTreeCache(abc.ABC):
|
||||
self._cacheListener(event)
|
||||
|
||||
def _eventWorker(self):
|
||||
while not self._stopped:
|
||||
while not (self._stopped or self._stop_workers):
|
||||
event = self._event_queue.get()
|
||||
if event is None:
|
||||
self._event_queue.task_done()
|
||||
@ -885,7 +914,7 @@ class NodepoolTreeCache(abc.ABC):
|
||||
self._playback_queue.put((event, future, key))
|
||||
|
||||
def _playbackWorker(self):
|
||||
while not self._stopped:
|
||||
while not (self._stopped or self._stop_workers):
|
||||
item = self._playback_queue.get()
|
||||
if item is None:
|
||||
self._playback_queue.task_done()
|
||||
@ -942,7 +971,10 @@ class NodepoolTreeCache(abc.ABC):
|
||||
self._cached_paths.discard(event.path)
|
||||
|
||||
# Some caches have special handling for certain sub-objects
|
||||
if self.preCacheHook(event, exists):
|
||||
self.preCacheHook(event, exists)
|
||||
|
||||
# If we don't actually cache this kind of object, return now
|
||||
if key is None:
|
||||
return
|
||||
|
||||
if data:
|
||||
@ -988,9 +1020,6 @@ class NodepoolTreeCache(abc.ABC):
|
||||
Otherwise, it indicates whether or not the EventType would
|
||||
cause the node to exist in ZK.
|
||||
|
||||
If True is returned, then the cache will stop processing the event.
|
||||
Other return values are ignored.
|
||||
|
||||
:param EventType event: The event.
|
||||
:param bool exists: Whether the object exists in ZK.
|
||||
|
||||
@ -1051,8 +1080,7 @@ class ImageCache(NodepoolTreeCache):
|
||||
image.paused = True
|
||||
else:
|
||||
image.paused = False
|
||||
# This event was for a paused path; no further handling necessary
|
||||
return True
|
||||
return
|
||||
|
||||
def objectFromDict(self, d, key):
|
||||
if len(key) == 4:
|
||||
@ -1112,8 +1140,7 @@ class NodeCache(NodepoolTreeCache):
|
||||
node.lock_contenders.add(contender)
|
||||
else:
|
||||
node.lock_contenders.discard(contender)
|
||||
# This event was for a lock path; no further handling necessary
|
||||
return True
|
||||
return
|
||||
|
||||
def postCacheHook(self, event, data, stat):
|
||||
# set the stats event so the stats reporting thread can act upon it
|
||||
|
Loading…
Reference in New Issue
Block a user