Merge pull request #398 from tonyseek/feature/treecache-recipe
New recipe proposal: TreeCache
This commit is contained in:
commit
b1f3d618a2
@ -16,6 +16,7 @@ organized alphabetically by module name.
|
||||
api/interfaces
|
||||
api/protocol/states
|
||||
api/recipe/barrier
|
||||
api/recipe/cache
|
||||
api/recipe/counter
|
||||
api/recipe/election
|
||||
api/recipe/lease
|
||||
|
26
docs/api/recipe/cache.rst
Normal file
26
docs/api/recipe/cache.rst
Normal file
@ -0,0 +1,26 @@
|
||||
.. _cache_module:
|
||||
|
||||
:mod:`kazoo.recipe.cache`
|
||||
----------------------------
|
||||
|
||||
.. automodule:: kazoo.recipe.cache
|
||||
|
||||
Public API
|
||||
++++++++++
|
||||
|
||||
.. autoclass:: TreeCache
|
||||
|
||||
.. automethod:: start
|
||||
.. automethod:: close
|
||||
.. automethod:: listen
|
||||
.. automethod:: listen_fault
|
||||
.. automethod:: get_data
|
||||
.. automethod:: get_children
|
||||
|
||||
.. autoclass:: TreeEvent
|
||||
:members:
|
||||
:show-inheritance:
|
||||
|
||||
.. autoclass:: NodeData
|
||||
:members:
|
||||
:show-inheritance:
|
389
kazoo/recipe/cache.py
Normal file
389
kazoo/recipe/cache.py
Normal file
@ -0,0 +1,389 @@
|
||||
"""TreeCache
|
||||
|
||||
:Maintainer: Jiangge Zhang <tonyseek@gmail.com>
|
||||
:Maintainer: Haochuan Guo <guohaochuan@gmail.com>
|
||||
:Maintainer: Tianwen Zhang <mail2tevin@gmail.com>
|
||||
:Status: Alpha
|
||||
|
||||
A port of the Apache Curator's TreeCache recipe. It builds an in-memory cache
|
||||
of a subtree in ZooKeeper and keeps it up-to-date.
|
||||
|
||||
See also: http://curator.apache.org/curator-recipes/tree-cache.html
|
||||
"""
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
import os
|
||||
import logging
|
||||
import contextlib
|
||||
import functools
|
||||
import operator
|
||||
|
||||
from kazoo.exceptions import NoNodeError, KazooException
|
||||
from kazoo.protocol.states import KazooState, EventType
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TreeCache(object):
|
||||
"""The cache of a ZooKeeper subtree.
|
||||
|
||||
:param client: A :class:`~kazoo.client.KazooClient` instance.
|
||||
:param path: The root path of subtree.
|
||||
"""
|
||||
|
||||
STATE_LATENT = 0
|
||||
STATE_STARTED = 1
|
||||
STATE_CLOSED = 2
|
||||
|
||||
def __init__(self, client, path):
|
||||
self._client = client
|
||||
self._root = TreeNode.make_root(self, path)
|
||||
self._state = self.STATE_LATENT
|
||||
self._outstanding_ops = 0
|
||||
self._is_initialized = False
|
||||
self._error_listeners = []
|
||||
self._event_listeners = []
|
||||
|
||||
def start(self):
|
||||
"""Starts the cache.
|
||||
|
||||
The cache is not started automatically. You must call this method.
|
||||
|
||||
After a cache started, all changes of subtree will be synchronized
|
||||
from the ZooKeeper server. Events will be fired for those activity.
|
||||
|
||||
See also :meth:`~TreeCache.listen`.
|
||||
|
||||
.. note::
|
||||
|
||||
This method is not thread safe.
|
||||
"""
|
||||
if self._state == self.STATE_LATENT:
|
||||
self._state = self.STATE_STARTED
|
||||
elif self._state == self.STATE_CLOSED:
|
||||
raise KazooException('already closed')
|
||||
else:
|
||||
raise KazooException('already started')
|
||||
|
||||
self._client.add_listener(self._session_watcher)
|
||||
self._client.ensure_path(self._root._path)
|
||||
|
||||
if self._client.connected:
|
||||
self._root.on_created()
|
||||
|
||||
def close(self):
|
||||
"""Closes the cache.
|
||||
|
||||
A closed cache was detached from ZooKeeper's changes. And all nodes
|
||||
will be invalidated.
|
||||
|
||||
Once a tree cache was closed, it could not be started again. You should
|
||||
only close a tree cache while you want to recycle it.
|
||||
|
||||
.. note::
|
||||
|
||||
This method is not thread safe.
|
||||
"""
|
||||
if self._state == self.STATE_STARTED:
|
||||
self._state = self.STATE_CLOSED
|
||||
self._client.remove_listener(self._session_watcher)
|
||||
with handle_exception(self._error_listeners):
|
||||
self._root.on_deleted()
|
||||
|
||||
def listen(self, listener):
|
||||
"""Registers a function to listen the cache events.
|
||||
|
||||
The cache events are changes of local data. They are delivered from
|
||||
watching notifications in ZooKeeper session.
|
||||
|
||||
This method can be use as a decorator.
|
||||
|
||||
:param listener: A callable object which accepting a
|
||||
:class:`~kazoo.recipe.cache.TreeEvent` instance as
|
||||
its argument.
|
||||
"""
|
||||
self._event_listeners.append(listener)
|
||||
return listener
|
||||
|
||||
def listen_fault(self, listener):
|
||||
"""Registers a function to listen the exceptions.
|
||||
|
||||
It is possible to meet some exceptions during the cache running. You
|
||||
could specific handlers for them.
|
||||
|
||||
This method can be use as a decorator.
|
||||
|
||||
:param listener: A callable object which accepting an exception as its
|
||||
argument.
|
||||
"""
|
||||
self._error_listeners.append(listener)
|
||||
return listener
|
||||
|
||||
def get_data(self, path, default=None):
|
||||
"""Gets data of a node from cache.
|
||||
|
||||
:param path: The absolute path string.
|
||||
:param default: The default value which will be returned if the node
|
||||
does not exist.
|
||||
:raises ValueError: If the path is outside of this subtree.
|
||||
:returns: A :class:`~kazoo.recipe.cache.NodeData` instance.
|
||||
"""
|
||||
node = self._find_node(path)
|
||||
return default if node is None else node._data
|
||||
|
||||
def get_children(self, path, default=None):
|
||||
"""Gets node children list from in-memory snapshot.
|
||||
|
||||
:param path: The absolute path string.
|
||||
:param default: The default value which will be returned if the node
|
||||
does not exist.
|
||||
:raises ValueError: If the path is outside of this subtree.
|
||||
:returns: The :class:`frozenset` which including children names.
|
||||
"""
|
||||
node = self._find_node(path)
|
||||
return default if node is None else frozenset(node._children)
|
||||
|
||||
def _find_node(self, path):
|
||||
if not path.startswith(self._root._path):
|
||||
raise ValueError('outside of tree')
|
||||
striped_path = path[len(self._root._path):].strip('/')
|
||||
splited_path = [p for p in striped_path.split('/') if p]
|
||||
current_node = self._root
|
||||
for node_name in splited_path:
|
||||
if node_name not in current_node._children:
|
||||
return
|
||||
current_node = current_node._children[node_name]
|
||||
return current_node
|
||||
|
||||
def _publish_event(self, event_type, event_data=None):
|
||||
event = TreeEvent.make(event_type, event_data)
|
||||
if self._state != self.STATE_CLOSED:
|
||||
logger.debug('public event: %r', event)
|
||||
self._in_background(self._do_publish_event, event)
|
||||
|
||||
def _do_publish_event(self, event):
|
||||
for listener in self._event_listeners:
|
||||
with handle_exception(self._error_listeners):
|
||||
listener(event)
|
||||
|
||||
def _in_background(self, func, *args, **kwargs):
|
||||
self._client.handler.callback_queue.put(lambda: func(*args, **kwargs))
|
||||
|
||||
def _session_watcher(self, state):
|
||||
if state == KazooState.SUSPENDED:
|
||||
self._publish_event(TreeEvent.CONNECTION_SUSPENDED)
|
||||
elif state == KazooState.CONNECTED:
|
||||
with handle_exception(self._error_listeners):
|
||||
self._root.on_reconnected()
|
||||
self._publish_event(TreeEvent.CONNECTION_RECONNECTED)
|
||||
elif state == KazooState.LOST:
|
||||
self._is_initialized = False
|
||||
self._publish_event(TreeEvent.CONNECTION_LOST)
|
||||
|
||||
|
||||
class TreeNode(object):
|
||||
"""The tree node record.
|
||||
|
||||
:param tree: A :class:`~kazoo.recipe.cache.TreeCache` instance.
|
||||
:param path: The path of current node.
|
||||
:param parent: The parent node reference. ``None`` for root node.
|
||||
"""
|
||||
|
||||
__slots__ = ('_tree', '_path', '_parent', '_depth', '_children', '_state',
|
||||
'_data')
|
||||
|
||||
STATE_PENDING = 0
|
||||
STATE_LIVE = 1
|
||||
STATE_DEAD = 2
|
||||
|
||||
def __init__(self, tree, path, parent):
|
||||
self._tree = tree
|
||||
self._path = path
|
||||
self._parent = parent
|
||||
self._depth = parent._depth + 1 if parent else 0
|
||||
self._children = {}
|
||||
self._state = self.STATE_PENDING
|
||||
self._data = None
|
||||
|
||||
@classmethod
|
||||
def make_root(cls, tree, path):
|
||||
return cls(tree, path, None)
|
||||
|
||||
def on_reconnected(self):
|
||||
self._refresh()
|
||||
for child in self._children.values():
|
||||
child.on_reconnected()
|
||||
|
||||
def on_created(self):
|
||||
self._refresh()
|
||||
|
||||
def on_deleted(self):
|
||||
old_children, self._children = self._children, {}
|
||||
old_data, self._data = self._data, None
|
||||
|
||||
for old_child in old_children.values():
|
||||
old_child.on_deleted()
|
||||
|
||||
if self._tree._state == self._tree.STATE_CLOSED:
|
||||
return
|
||||
|
||||
old_state, self._state = self._state, self.STATE_DEAD
|
||||
if old_state == self.STATE_LIVE:
|
||||
self._publish_event(TreeEvent.NODE_REMOVED, old_data)
|
||||
|
||||
if self._parent is None:
|
||||
self._call_client('exists', self._path) # root node
|
||||
else:
|
||||
child = self._path[len(self._parent._path) + 1:]
|
||||
if self._parent._children.get(child) is self:
|
||||
del self._parent._children[child]
|
||||
|
||||
def _publish_event(self, *args, **kwargs):
|
||||
return self._tree._publish_event(*args, **kwargs)
|
||||
|
||||
def _refresh(self):
|
||||
self._refresh_data()
|
||||
self._refresh_children()
|
||||
|
||||
def _refresh_data(self):
|
||||
self._call_client('get', self._path)
|
||||
|
||||
def _refresh_children(self):
|
||||
# TODO max-depth checking support
|
||||
self._call_client('get_children', self._path)
|
||||
|
||||
def _call_client(self, method_name, path, *args):
|
||||
self._tree._outstanding_ops += 1
|
||||
callback = functools.partial(
|
||||
self._tree._in_background, self._process_result,
|
||||
method_name, path)
|
||||
kwargs = {'watch': self._process_watch}
|
||||
method = getattr(self._tree._client, method_name + '_async')
|
||||
method(path, *args, **kwargs).rawlink(callback)
|
||||
|
||||
def _process_watch(self, watched_event):
|
||||
logger.debug('process_watch: %r', watched_event)
|
||||
with handle_exception(self._tree._error_listeners):
|
||||
if watched_event.type == EventType.CREATED:
|
||||
assert self._parent is None, 'unexpected CREATED on non-root'
|
||||
self.on_created()
|
||||
elif watched_event.type == EventType.DELETED:
|
||||
self.on_deleted()
|
||||
elif watched_event.type == EventType.CHANGED:
|
||||
self._refresh_data()
|
||||
elif watched_event.type == EventType.CHILD:
|
||||
self._refresh_children()
|
||||
|
||||
def _process_result(self, method_name, path, result):
|
||||
logger.debug('process_result: %s %s', method_name, path)
|
||||
if method_name == 'exists':
|
||||
assert self._parent is None, 'unexpected EXISTS on non-root'
|
||||
# the value of result will be set with `None` if node not exists.
|
||||
if result.get() is not None:
|
||||
if self._state == self.STATE_DEAD:
|
||||
self._state = self.STATE_PENDING
|
||||
self.on_created()
|
||||
elif method_name == 'get_children':
|
||||
try:
|
||||
children = result.get()
|
||||
except NoNodeError:
|
||||
self.on_deleted()
|
||||
else:
|
||||
for child in sorted(children):
|
||||
full_path = os.path.join(path, child)
|
||||
if child not in self._children:
|
||||
node = TreeNode(self._tree, full_path, self)
|
||||
self._children[child] = node
|
||||
node.on_created()
|
||||
elif method_name == 'get':
|
||||
try:
|
||||
data, stat = result.get()
|
||||
except NoNodeError:
|
||||
self.on_deleted()
|
||||
else:
|
||||
old_data, self._data = (
|
||||
self._data, NodeData.make(path, data, stat))
|
||||
|
||||
old_state, self._state = self._state, self.STATE_LIVE
|
||||
if old_state == self.STATE_LIVE:
|
||||
if old_data is None or old_data.stat.mzxid != stat.mzxid:
|
||||
self._publish_event(TreeEvent.NODE_UPDATED, self._data)
|
||||
else:
|
||||
self._publish_event(TreeEvent.NODE_ADDED, self._data)
|
||||
else: # pragma: no cover
|
||||
logger.warning('unknown operation %s', method_name)
|
||||
self._tree._outstanding_ops -= 1
|
||||
return
|
||||
|
||||
self._tree._outstanding_ops -= 1
|
||||
if self._tree._outstanding_ops == 0 and not self._tree._is_initialized:
|
||||
self._tree._is_initialized = True
|
||||
self._publish_event(TreeEvent.INITIALIZED)
|
||||
|
||||
|
||||
class TreeEvent(tuple):
|
||||
"""The immutable event tuple of cache."""
|
||||
|
||||
NODE_ADDED = 0
|
||||
NODE_UPDATED = 1
|
||||
NODE_REMOVED = 2
|
||||
CONNECTION_SUSPENDED = 3
|
||||
CONNECTION_RECONNECTED = 4
|
||||
CONNECTION_LOST = 5
|
||||
INITIALIZED = 6
|
||||
|
||||
#: An enumerate integer to indicate event type.
|
||||
event_type = property(operator.itemgetter(0))
|
||||
|
||||
#: A :class:`~kazoo.recipe.cache.NodeData` instance.
|
||||
event_data = property(operator.itemgetter(1))
|
||||
|
||||
@classmethod
|
||||
def make(cls, event_type, event_data):
|
||||
"""Creates a new TreeEvent tuple.
|
||||
|
||||
:returns: A :class:`~kazoo.recipe.cache.TreeEvent` instance.
|
||||
"""
|
||||
assert event_type in (
|
||||
cls.NODE_ADDED, cls.NODE_UPDATED, cls.NODE_REMOVED,
|
||||
cls.CONNECTION_SUSPENDED, cls.CONNECTION_RECONNECTED,
|
||||
cls.CONNECTION_LOST, cls.INITIALIZED)
|
||||
return cls((event_type, event_data))
|
||||
|
||||
|
||||
class NodeData(tuple):
|
||||
"""The immutable node data tuple of cache."""
|
||||
|
||||
#: The absolute path string of current node.
|
||||
path = property(operator.itemgetter(0))
|
||||
|
||||
#: The bytes data of current node.
|
||||
data = property(operator.itemgetter(1))
|
||||
|
||||
#: The stat information of current node.
|
||||
stat = property(operator.itemgetter(2))
|
||||
|
||||
@classmethod
|
||||
def make(cls, path, data, stat):
|
||||
"""Creates a new NodeData tuple.
|
||||
|
||||
:returns: A :class:`~kazoo.recipe.cache.NodeData` instance.
|
||||
"""
|
||||
return cls((path, data, stat))
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def handle_exception(listeners):
|
||||
try:
|
||||
yield
|
||||
except Exception as e:
|
||||
logger.debug('processing error: %r', e)
|
||||
for listener in listeners:
|
||||
try:
|
||||
listener(e)
|
||||
except: # pragma: no cover
|
||||
logger.exception('Exception handling exception') # oops
|
||||
else:
|
||||
logger.exception('No listener to process %r', e)
|
278
kazoo/tests/test_cache.py
Normal file
278
kazoo/tests/test_cache.py
Normal file
@ -0,0 +1,278 @@
|
||||
import uuid
|
||||
|
||||
from mock import patch, call, Mock
|
||||
from nose.tools import eq_, ok_, assert_not_equal, raises
|
||||
|
||||
from kazoo.testing import KazooTestCase
|
||||
from kazoo.exceptions import KazooException
|
||||
from kazoo.recipe.cache import TreeCache, TreeNode, TreeEvent
|
||||
|
||||
|
||||
class KazooTreeCacheTests(KazooTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(KazooTreeCacheTests, self).setUp()
|
||||
self._event_queue = self.client.handler.queue_impl()
|
||||
self._error_queue = self.client.handler.queue_impl()
|
||||
self.path = None
|
||||
self.cache = None
|
||||
|
||||
def tearDown(self):
|
||||
super(KazooTreeCacheTests, self).tearDown()
|
||||
if not self._error_queue.empty():
|
||||
try:
|
||||
raise self._error_queue.get()
|
||||
except FakeException:
|
||||
pass
|
||||
|
||||
def make_cache(self):
|
||||
if self.cache is None:
|
||||
self.path = '/' + uuid.uuid4().hex
|
||||
self.cache = TreeCache(self.client, self.path)
|
||||
self.cache.listen(lambda event: self._event_queue.put(event))
|
||||
self.cache.listen_fault(lambda error: self._error_queue.put(error))
|
||||
self.cache.start()
|
||||
return self.cache
|
||||
|
||||
def wait_cache(self, expect=None, since=None, timeout=10):
|
||||
started = since is None
|
||||
while True:
|
||||
event = self._event_queue.get(timeout=timeout)
|
||||
if started:
|
||||
if expect is not None:
|
||||
eq_(event.event_type, expect)
|
||||
return event
|
||||
if event.event_type == since:
|
||||
started = True
|
||||
if expect is None:
|
||||
return
|
||||
|
||||
def spy_client(self, method_name):
|
||||
method = getattr(self.client, method_name)
|
||||
return patch.object(self.client, method_name, wraps=method)
|
||||
|
||||
def test_start(self):
|
||||
self.make_cache()
|
||||
self.wait_cache(since=TreeEvent.INITIALIZED)
|
||||
|
||||
stat = self.client.exists(self.path)
|
||||
eq_(stat.version, 0)
|
||||
|
||||
eq_(self.cache._state, TreeCache.STATE_STARTED)
|
||||
eq_(self.cache._root._state, TreeNode.STATE_LIVE)
|
||||
|
||||
@raises(KazooException)
|
||||
def test_start_started(self):
|
||||
self.make_cache()
|
||||
self.cache.start()
|
||||
|
||||
@raises(KazooException)
|
||||
def test_start_closed(self):
|
||||
self.make_cache()
|
||||
self.cache.start()
|
||||
self.cache.close()
|
||||
self.cache.start()
|
||||
|
||||
def test_close(self):
|
||||
self.make_cache()
|
||||
self.wait_cache(since=TreeEvent.INITIALIZED)
|
||||
self.client.create(self.path + '/foo/bar/baz', makepath=True)
|
||||
for _ in range(3):
|
||||
self.wait_cache(TreeEvent.NODE_ADDED)
|
||||
|
||||
self.cache.close()
|
||||
|
||||
# nothing should be published since tree closed
|
||||
ok_(self._event_queue.empty())
|
||||
|
||||
# tree should be empty
|
||||
eq_(self.cache._root._children, {})
|
||||
eq_(self.cache._root._data, None)
|
||||
eq_(self.cache._state, TreeCache.STATE_CLOSED)
|
||||
|
||||
# node state should not be changed
|
||||
assert_not_equal(self.cache._root._state, TreeNode.STATE_DEAD)
|
||||
|
||||
def test_children_operation(self):
|
||||
self.make_cache()
|
||||
self.wait_cache(since=TreeEvent.INITIALIZED)
|
||||
|
||||
self.client.create(self.path + '/test_children', b'test_children_1')
|
||||
event = self.wait_cache(TreeEvent.NODE_ADDED)
|
||||
eq_(event.event_type, TreeEvent.NODE_ADDED)
|
||||
eq_(event.event_data.path, self.path + '/test_children')
|
||||
eq_(event.event_data.data, b'test_children_1')
|
||||
eq_(event.event_data.stat.version, 0)
|
||||
|
||||
self.client.set(self.path + '/test_children', b'test_children_2')
|
||||
event = self.wait_cache(TreeEvent.NODE_UPDATED)
|
||||
eq_(event.event_type, TreeEvent.NODE_UPDATED)
|
||||
eq_(event.event_data.path, self.path + '/test_children')
|
||||
eq_(event.event_data.data, b'test_children_2')
|
||||
eq_(event.event_data.stat.version, 1)
|
||||
|
||||
self.client.delete(self.path + '/test_children')
|
||||
event = self.wait_cache(TreeEvent.NODE_REMOVED)
|
||||
eq_(event.event_type, TreeEvent.NODE_REMOVED)
|
||||
eq_(event.event_data.path, self.path + '/test_children')
|
||||
eq_(event.event_data.data, b'test_children_2')
|
||||
eq_(event.event_data.stat.version, 1)
|
||||
|
||||
def test_subtree_operation(self):
|
||||
self.make_cache()
|
||||
self.wait_cache(since=TreeEvent.INITIALIZED)
|
||||
|
||||
self.client.create(self.path + '/foo/bar/baz', makepath=True)
|
||||
for relative_path in ('/foo', '/foo/bar', '/foo/bar/baz'):
|
||||
event = self.wait_cache(TreeEvent.NODE_ADDED)
|
||||
eq_(event.event_type, TreeEvent.NODE_ADDED)
|
||||
eq_(event.event_data.path, self.path + relative_path)
|
||||
eq_(event.event_data.data, b'')
|
||||
eq_(event.event_data.stat.version, 0)
|
||||
|
||||
self.client.delete(self.path + '/foo', recursive=True)
|
||||
for relative_path in ('/foo/bar/baz', '/foo/bar', '/foo'):
|
||||
event = self.wait_cache(TreeEvent.NODE_REMOVED)
|
||||
eq_(event.event_type, TreeEvent.NODE_REMOVED)
|
||||
eq_(event.event_data.path, self.path + relative_path)
|
||||
|
||||
def test_get_data(self):
|
||||
cache = self.make_cache()
|
||||
self.wait_cache(since=TreeEvent.INITIALIZED)
|
||||
self.client.create(self.path + '/foo/bar/baz', b'@', makepath=True)
|
||||
self.wait_cache(TreeEvent.NODE_ADDED)
|
||||
self.wait_cache(TreeEvent.NODE_ADDED)
|
||||
self.wait_cache(TreeEvent.NODE_ADDED)
|
||||
|
||||
with patch.object(cache, '_client'): # disable any remote operation
|
||||
eq_(cache.get_data(self.path).data, b'')
|
||||
eq_(cache.get_data(self.path).stat.version, 0)
|
||||
|
||||
eq_(cache.get_data(self.path + '/foo').data, b'')
|
||||
eq_(cache.get_data(self.path + '/foo').stat.version, 0)
|
||||
|
||||
eq_(cache.get_data(self.path + '/foo/bar').data, b'')
|
||||
eq_(cache.get_data(self.path + '/foo/bar').stat.version, 0)
|
||||
|
||||
eq_(cache.get_data(self.path + '/foo/bar/baz').data, b'@')
|
||||
eq_(cache.get_data(self.path + '/foo/bar/baz').stat.version, 0)
|
||||
|
||||
def test_get_children(self):
|
||||
cache = self.make_cache()
|
||||
self.wait_cache(since=TreeEvent.INITIALIZED)
|
||||
self.client.create(self.path + '/foo/bar/baz', b'@', makepath=True)
|
||||
self.wait_cache(TreeEvent.NODE_ADDED)
|
||||
self.wait_cache(TreeEvent.NODE_ADDED)
|
||||
self.wait_cache(TreeEvent.NODE_ADDED)
|
||||
|
||||
with patch.object(cache, '_client'): # disable any remote operation
|
||||
eq_(cache.get_children(self.path + '/foo/bar/baz'), frozenset())
|
||||
eq_(cache.get_children(self.path + '/foo/bar'), frozenset(['baz']))
|
||||
eq_(cache.get_children(self.path + '/foo'), frozenset(['bar']))
|
||||
eq_(cache.get_children(self.path), frozenset(['foo']))
|
||||
|
||||
@raises(ValueError)
|
||||
def test_get_data_out_of_tree(self):
|
||||
self.make_cache()
|
||||
self.wait_cache(since=TreeEvent.INITIALIZED)
|
||||
self.cache.get_data('/out_of_tree')
|
||||
|
||||
@raises(ValueError)
|
||||
def test_get_children_out_of_tree(self):
|
||||
self.make_cache()
|
||||
self.wait_cache(since=TreeEvent.INITIALIZED)
|
||||
self.cache.get_children('/out_of_tree')
|
||||
|
||||
def test_get_data_no_node(self):
|
||||
cache = self.make_cache()
|
||||
self.wait_cache(since=TreeEvent.INITIALIZED)
|
||||
|
||||
with patch.object(cache, '_client'): # disable any remote operation
|
||||
eq_(cache.get_data(self.path + '/non_exists'), None)
|
||||
|
||||
def test_get_children_no_node(self):
|
||||
cache = self.make_cache()
|
||||
self.wait_cache(since=TreeEvent.INITIALIZED)
|
||||
|
||||
with patch.object(cache, '_client'): # disable any remote operation
|
||||
eq_(cache.get_children(self.path + '/non_exists'), None)
|
||||
|
||||
def test_session_reconnected(self):
|
||||
self.make_cache()
|
||||
self.wait_cache(since=TreeEvent.INITIALIZED)
|
||||
|
||||
self.client.create(self.path + '/foo')
|
||||
event = self.wait_cache(TreeEvent.NODE_ADDED)
|
||||
eq_(event.event_data.path, self.path + '/foo')
|
||||
|
||||
with self.spy_client('get_async') as get_data:
|
||||
with self.spy_client('get_children_async') as get_children:
|
||||
# session suspended
|
||||
self.lose_connection(self.client.handler.event_object)
|
||||
self.wait_cache(TreeEvent.CONNECTION_SUSPENDED)
|
||||
|
||||
# There are a serial refreshing operation here. But NODE_ADDED
|
||||
# events will not be raised because the zxid of nodes are the
|
||||
# same during reconnecting.
|
||||
|
||||
# connection restore
|
||||
self.wait_cache(TreeEvent.CONNECTION_RECONNECTED)
|
||||
|
||||
# wait for outstanding operations
|
||||
while self.cache._outstanding_ops > 0:
|
||||
self.client.handler.sleep_func(0.1)
|
||||
|
||||
# inspect in-memory nodes
|
||||
_node_root = self.cache._root
|
||||
_node_foo = self.cache._root._children['foo']
|
||||
|
||||
# make sure that all nodes are refreshed
|
||||
get_data.assert_has_calls([
|
||||
call(self.path, watch=_node_root._process_watch),
|
||||
call(self.path + '/foo', watch=_node_foo._process_watch),
|
||||
], any_order=True)
|
||||
get_children.assert_has_calls([
|
||||
call(self.path, watch=_node_root._process_watch),
|
||||
call(self.path + '/foo', watch=_node_foo._process_watch),
|
||||
], any_order=True)
|
||||
|
||||
def test_root_recreated(self):
|
||||
self.make_cache()
|
||||
self.wait_cache(since=TreeEvent.INITIALIZED)
|
||||
|
||||
# remove root node
|
||||
self.client.delete(self.path)
|
||||
event = self.wait_cache(TreeEvent.NODE_REMOVED)
|
||||
eq_(event.event_type, TreeEvent.NODE_REMOVED)
|
||||
eq_(event.event_data.data, b'')
|
||||
eq_(event.event_data.path, self.path)
|
||||
eq_(event.event_data.stat.version, 0)
|
||||
|
||||
# re-create root node
|
||||
self.client.ensure_path(self.path)
|
||||
event = self.wait_cache(TreeEvent.NODE_ADDED)
|
||||
eq_(event.event_type, TreeEvent.NODE_ADDED)
|
||||
eq_(event.event_data.data, b'')
|
||||
eq_(event.event_data.path, self.path)
|
||||
eq_(event.event_data.stat.version, 0)
|
||||
|
||||
self.assertTrue(
|
||||
self.cache._outstanding_ops >= 0,
|
||||
'unexpected outstanding ops %r' % self.cache._outstanding_ops)
|
||||
|
||||
def test_exception_handler(self):
|
||||
error_value = FakeException()
|
||||
error_handler = Mock()
|
||||
|
||||
with patch.object(TreeNode, 'on_deleted') as on_deleted:
|
||||
on_deleted.side_effect = [error_value]
|
||||
|
||||
self.make_cache()
|
||||
self.cache.listen_fault(error_handler)
|
||||
|
||||
self.cache.close()
|
||||
error_handler.assert_called_once_with(error_value)
|
||||
|
||||
|
||||
class FakeException(Exception):
|
||||
pass
|
Loading…
Reference in New Issue
Block a user