Correct node cache lock handling
A logic error in the TreeCache prevented us from correctly re-establishing the state of node locks after a zk connection loss. In such a case, we need to fetch data for every item in the tree to determine whether it exists, but we were only fetching data for cache objects, not ancillary znodes like lock nodes. Change-Id: I4e9faa4981b90a6b1f46d2f0347866e2166d5b7f
This commit is contained in:
parent
511d580a8a
commit
173b59abbb
@ -1401,3 +1401,30 @@ class TestTreeCache(tests.DBTestCase):
|
|||||||
|
|
||||||
if not checked and test['kind']:
|
if not checked and test['kind']:
|
||||||
raise Exception("Unhandled kind %s" % (test['kind']))
|
raise Exception("Unhandled kind %s" % (test['kind']))
|
||||||
|
|
||||||
|
def test_node_cache_lock_handling(self):
|
||||||
|
my_zk = zk.ZooKeeper(self.zk.client, enable_cache=True)
|
||||||
|
node = zk.Node()
|
||||||
|
node.state = zk.BUILDING
|
||||||
|
node.provider = 'rax'
|
||||||
|
my_zk.storeNode(node)
|
||||||
|
my_zk.lockNode(node)
|
||||||
|
|
||||||
|
for _ in iterate_timeout(10, Exception, 'cache to sync', interval=0.1):
|
||||||
|
cached_node = my_zk.getNode(node.id, cached=True, only_cached=True)
|
||||||
|
if len(cached_node.lock_contenders):
|
||||||
|
break
|
||||||
|
self.assertEqual(len(cached_node.lock_contenders), 1)
|
||||||
|
|
||||||
|
my_zk._node_cache._ready.clear()
|
||||||
|
my_zk._node_cache._start()
|
||||||
|
my_zk._node_cache._event_queue.join()
|
||||||
|
my_zk._node_cache._playback_queue.join()
|
||||||
|
|
||||||
|
self.assertEqual(len(cached_node.lock_contenders), 1)
|
||||||
|
my_zk.unlockNode(node)
|
||||||
|
|
||||||
|
for _ in iterate_timeout(10, Exception, 'cache to sync', interval=0.1):
|
||||||
|
if not len(cached_node.lock_contenders):
|
||||||
|
break
|
||||||
|
self.assertEqual(len(cached_node.lock_contenders), 0)
|
||||||
|
@ -835,6 +835,7 @@ class NodepoolTreeCache(abc.ABC):
|
|||||||
while not self._stopped:
|
while not self._stopped:
|
||||||
event = self._event_queue.get()
|
event = self._event_queue.get()
|
||||||
if event is None:
|
if event is None:
|
||||||
|
self._event_queue.task_done()
|
||||||
continue
|
continue
|
||||||
|
|
||||||
qsize = self._event_queue.qsize()
|
qsize = self._event_queue.qsize()
|
||||||
@ -849,6 +850,7 @@ class NodepoolTreeCache(abc.ABC):
|
|||||||
self._handleCacheEvent(event)
|
self._handleCacheEvent(event)
|
||||||
except Exception:
|
except Exception:
|
||||||
self.log.exception("Error handling event %s:", event)
|
self.log.exception("Error handling event %s:", event)
|
||||||
|
self._event_queue.task_done()
|
||||||
|
|
||||||
def _handleCacheEvent(self, event):
|
def _handleCacheEvent(self, event):
|
||||||
# Ignore root node since we don't maintain a cached object for
|
# Ignore root node since we don't maintain a cached object for
|
||||||
@ -869,9 +871,11 @@ class NodepoolTreeCache(abc.ABC):
|
|||||||
fetch = False
|
fetch = False
|
||||||
|
|
||||||
key = self.parsePath(event.path)
|
key = self.parsePath(event.path)
|
||||||
if key is None:
|
if key is None and event.type != EventType.NONE:
|
||||||
# The cache doesn't care about this path, so we don't need
|
# The cache doesn't care about this path, so we don't need
|
||||||
# to fetch.
|
# to fetch (unless the type is none (re-initialization) in
|
||||||
|
# which case we always need to fetch in order to determine
|
||||||
|
# existence).
|
||||||
fetch = False
|
fetch = False
|
||||||
|
|
||||||
if fetch:
|
if fetch:
|
||||||
@ -884,6 +888,7 @@ class NodepoolTreeCache(abc.ABC):
|
|||||||
while not self._stopped:
|
while not self._stopped:
|
||||||
item = self._playback_queue.get()
|
item = self._playback_queue.get()
|
||||||
if item is None:
|
if item is None:
|
||||||
|
self._playback_queue.task_done()
|
||||||
continue
|
continue
|
||||||
|
|
||||||
qsize = self._playback_queue.qsize()
|
qsize = self._playback_queue.qsize()
|
||||||
@ -900,6 +905,7 @@ class NodepoolTreeCache(abc.ABC):
|
|||||||
self._handlePlayback(event, future, key)
|
self._handlePlayback(event, future, key)
|
||||||
except Exception:
|
except Exception:
|
||||||
self.log.exception("Error playing back event %s:", event)
|
self.log.exception("Error playing back event %s:", event)
|
||||||
|
self._playback_queue.task_done()
|
||||||
|
|
||||||
def _handlePlayback(self, event, future, key):
|
def _handlePlayback(self, event, future, key):
|
||||||
self.event_log.debug("Cache playback event %s", event)
|
self.event_log.debug("Cache playback event %s", event)
|
||||||
|
Loading…
Reference in New Issue
Block a user