Use asynchronous fetch operations for tree cache
The TreeCache can be a bit of a bottleneck since it is centralizing in a single thread fetch operations which may have previously happened in different threads. It also may in some cases fetch more data than we did previously since it will update all objects in memory as they are updated in ZK regardless of whether nodepool is interested in them at the moment. Or at least, it may concentrate those fetches in a smaller timespan. To address the bottleneck, this change uses asynchronous fetches from ZooKeeper. The background worker which previously updated the cache in response to events from ZK is split into two threads. The first will decide whether or not a fetch for a given event is necessary, and then fire off the async fetch operation if so. It then pushes the future for that operation, along with other event information into a second queue. A second background thread (called the "playback" worker) will pop events from that queue, still in order, and await the future. This ensures all cache operations are still ordered, while multiple fetch operations can be outstanding in parallel. This relies even more heavily on the path parsing implemented by the three caches to be correct. In particular, we want to make sure that we don't accidentally fetch a znode if we're just dealing with a lock node event. The previous code sequenced the checks so that we would handle the lock node event and short circuit before deciding whether to fetch. The new code reverses that, which means that it is paramount that we don't accidentally have the path parsing return a match on a lock node. The cache would function correctly, but it would silently perform far more operations than necessary, so such an error would likely go unnoticed for some time. To prevent this, explicit testing for path parsing is added. Change-Id: I6ef10c724c3993ee565510ab917dce64d2e3d3f9
This commit is contained in:
parent
386fdc52c7
commit
acbd318084
|
@ -1261,3 +1261,138 @@ class TestTreeCache(tests.DBTestCase):
|
|||
if 'Queue size for cache' in str(line):
|
||||
found = True
|
||||
self.assertTrue(found)
|
||||
|
||||
def test_tree_cache_parsing(self):
|
||||
my_zk = zk.ZooKeeper(self.zk.client, enable_cache=True)
|
||||
# Node paths
|
||||
self.assertEqual(my_zk._node_cache.parsePath(
|
||||
'/nodepool/nodes/0000000001'),
|
||||
('0000000001',))
|
||||
self.assertIsNone(my_zk._node_cache.parsePath(
|
||||
'/nodepool/nodes/0000000001/lock'))
|
||||
self.assertIsNone(my_zk._node_cache.parsePath(
|
||||
'/nodepool/nodes/0000000001/lock/foo'))
|
||||
|
||||
self.assertEqual(my_zk._parseNodePath(
|
||||
'/nodepool/nodes/0000000001'),
|
||||
('0000000001',))
|
||||
self.assertIsNone(my_zk._parseNodePath(
|
||||
'/nodepool/nodes/0000000001/lock'))
|
||||
self.assertIsNone(my_zk._parseNodePath(
|
||||
'/nodepool/nodes/0000000001/lock/foo'))
|
||||
|
||||
self.assertIsNone(my_zk._parseNodeLockPath(
|
||||
'/nodepool/nodes/0000000001'))
|
||||
self.assertIsNone(my_zk._parseNodeLockPath(
|
||||
'/nodepool/nodes/0000000001/lock'))
|
||||
self.assertEqual(my_zk._parseNodeLockPath(
|
||||
'/nodepool/nodes/0000000001/lock/foo'),
|
||||
('0000000001', 'foo'))
|
||||
|
||||
# Request paths
|
||||
self.assertEqual(my_zk._request_cache.parsePath(
|
||||
'/nodepool/requests/0000000001'),
|
||||
('0000000001',))
|
||||
self.assertIsNone(my_zk._request_cache.parsePath(
|
||||
'/nodepool/requests/0000000001/lock'))
|
||||
self.assertIsNone(my_zk._request_cache.parsePath(
|
||||
'/nodepool/requests/0000000001/lock/foo'))
|
||||
|
||||
self.assertEqual(my_zk._parseRequestPath(
|
||||
'/nodepool/requests/0000000001'),
|
||||
('0000000001',))
|
||||
self.assertIsNone(my_zk._parseRequestPath(
|
||||
'/nodepool/requests/0000000001/lock'))
|
||||
self.assertIsNone(my_zk._parseRequestPath(
|
||||
'/nodepool/requests/0000000001/lock/foo'))
|
||||
|
||||
# Request cache doesn't cache lock contenders, so no lock path
|
||||
# parsing test
|
||||
|
||||
# Image paths
|
||||
|
||||
test_paths = [
|
||||
dict(path='/nodepool/images/NAME/builds/0000000001'
|
||||
'/providers/PROVIDER/images/0000000002',
|
||||
result=('NAME', '0000000001', 'PROVIDER', '0000000002'),
|
||||
kind='upload'),
|
||||
dict(path='/nodepool/images/NAME/builds/0000000001'
|
||||
'/providers/PROVIDER/images/0000000002/lock',
|
||||
result=None,
|
||||
kind=None),
|
||||
dict(path='/nodepool/images/NAME/builds/0000000001'
|
||||
'/providers/PROVIDER/images/0000000002/lock/foo',
|
||||
result=None,
|
||||
kind=None),
|
||||
dict(path='/nodepool/images/NAME/builds/0000000001'
|
||||
'/providers/PROVIDER/images',
|
||||
result=None,
|
||||
kind=None),
|
||||
dict(path='/nodepool/images/NAME/builds/0000000001'
|
||||
'/providers/PROVIDER',
|
||||
result=None,
|
||||
kind=None),
|
||||
dict(path='/nodepool/images/NAME/builds/0000000001'
|
||||
'/providers',
|
||||
result=None,
|
||||
kind=None),
|
||||
dict(path='/nodepool/images/NAME/builds/0000000001',
|
||||
result=('NAME', '0000000001'),
|
||||
kind='build'),
|
||||
dict(path='/nodepool/images/NAME/builds/0000000001'
|
||||
'/lock',
|
||||
result=None,
|
||||
kind=None),
|
||||
dict(path='/nodepool/images/NAME/builds/0000000001'
|
||||
'/lock/foo',
|
||||
result=None,
|
||||
kind=None),
|
||||
dict(path='/nodepool/images/NAME/builds',
|
||||
result=None,
|
||||
kind=None),
|
||||
dict(path='/nodepool/images/NAME/pause',
|
||||
result=('NAME',),
|
||||
kind='image pause'),
|
||||
dict(path='/nodepool/images/NAME',
|
||||
result=('NAME',),
|
||||
kind='image'),
|
||||
]
|
||||
|
||||
for test in test_paths:
|
||||
res = my_zk._image_cache.parsePath(test['path'])
|
||||
if test['result'] and test['kind'] != 'image pause':
|
||||
self.assertEqual(test['result'], res)
|
||||
else:
|
||||
self.assertIsNone(res)
|
||||
|
||||
checked = False
|
||||
res = my_zk._parseImageUploadPath(test['path'])
|
||||
if test['kind'] == 'upload':
|
||||
checked = True
|
||||
self.assertEqual(test['result'], res)
|
||||
else:
|
||||
self.assertIsNone(res)
|
||||
|
||||
res = my_zk._parseImageBuildPath(test['path'])
|
||||
if test['kind'] == 'build':
|
||||
checked = True
|
||||
self.assertEqual(test['result'], res)
|
||||
else:
|
||||
self.assertIsNone(res)
|
||||
|
||||
res = my_zk._parseImagePausePath(test['path'])
|
||||
if test['kind'] == 'image pause':
|
||||
checked = True
|
||||
self.assertEqual(test['result'], res)
|
||||
else:
|
||||
self.assertIsNone(res)
|
||||
|
||||
res = my_zk._parseImagePath(test['path'])
|
||||
if test['kind'] == 'image':
|
||||
checked = True
|
||||
self.assertEqual(test['result'], res)
|
||||
else:
|
||||
self.assertIsNone(res)
|
||||
|
||||
if not checked and test['kind']:
|
||||
raise Exception("Unhandled kind %s" % (test['kind']))
|
||||
|
|
|
@ -741,30 +741,26 @@ class NodepoolTreeCache(abc.ABC):
|
|||
def __init__(self, zk, root):
|
||||
self.zk = zk
|
||||
self.root = root
|
||||
self._last_qsize_warning = time.monotonic()
|
||||
self._last_event_warning = time.monotonic()
|
||||
self._last_playblack_warning = time.monotonic()
|
||||
self._cached_objects = {}
|
||||
self._cached_paths = set()
|
||||
self._ready = threading.Event()
|
||||
self._init_lock = threading.Lock()
|
||||
self._stopped = False
|
||||
self._queue = queue.Queue()
|
||||
self._background_thread = threading.Thread(
|
||||
target=self._backgroundWorker)
|
||||
self._background_thread.daemon = True
|
||||
self._background_thread.start()
|
||||
self._event_queue = queue.Queue()
|
||||
self._playback_queue = queue.Queue()
|
||||
self._event_worker = threading.Thread(
|
||||
target=self._eventWorker)
|
||||
self._event_worker.daemon = True
|
||||
self._event_worker.start()
|
||||
self._playback_worker = threading.Thread(
|
||||
target=self._playbackWorker)
|
||||
self._playback_worker.daemon = True
|
||||
self._playback_worker.start()
|
||||
zk.kazoo_client.add_listener(self._sessionListener)
|
||||
self._start()
|
||||
|
||||
def _backgroundWorker(self):
|
||||
while not self._stopped:
|
||||
event = self._queue.get()
|
||||
if event is None:
|
||||
continue
|
||||
try:
|
||||
self._handleCacheEvent(event)
|
||||
except Exception:
|
||||
self.log.exception("Error handling event %s:", event)
|
||||
|
||||
def _sessionListener(self, state):
|
||||
if state == KazooState.LOST:
|
||||
self._ready.clear()
|
||||
|
@ -773,14 +769,7 @@ class NodepoolTreeCache(abc.ABC):
|
|||
self.zk.kazoo_client.handler.short_spawn(self._start)
|
||||
|
||||
def _cacheListener(self, event):
|
||||
self._queue.put(event)
|
||||
qsize = self._queue.qsize()
|
||||
if qsize > self.qsize_warning_threshold:
|
||||
now = time.monotonic()
|
||||
if now - self._last_qsize_warning > 60:
|
||||
self.log.warning("Queue size for cache at %s is %s",
|
||||
self.root, qsize)
|
||||
self._last_qsize_warning = now
|
||||
self._event_queue.put(event)
|
||||
|
||||
def _start(self):
|
||||
locked = self._init_lock.acquire(blocking=False)
|
||||
|
@ -803,7 +792,8 @@ class NodepoolTreeCache(abc.ABC):
|
|||
|
||||
def stop(self):
|
||||
self._stopped = True
|
||||
self._queue.put(None)
|
||||
self._event_queue.put(None)
|
||||
self._playback_queue.put(None)
|
||||
|
||||
def _walkTree(self, root=None, seen_paths=None):
|
||||
# Recursively walk the tree and emit fake changed events for
|
||||
|
@ -841,27 +831,100 @@ class NodepoolTreeCache(abc.ABC):
|
|||
path)
|
||||
self._cacheListener(event)
|
||||
|
||||
def _handleCacheEvent(self, event):
|
||||
self.event_log.debug("Cache event %s", event)
|
||||
def _eventWorker(self):
|
||||
while not self._stopped:
|
||||
event = self._event_queue.get()
|
||||
if event is None:
|
||||
continue
|
||||
|
||||
data, stat = None, None
|
||||
# The cache is being (re-)initialized. Since this happens out
|
||||
# of sequence with the normal watch events, we can't be sure
|
||||
# whether the node still exists or not by the time we process
|
||||
# it. Later cache watch events may supercede this (for
|
||||
# example, we may process a NONE event here which we interpret
|
||||
# as a delete which may be followed by a normal delete event.
|
||||
# That case, and any other variations should be anticipated.
|
||||
qsize = self._event_queue.qsize()
|
||||
if qsize > self.qsize_warning_threshold:
|
||||
now = time.monotonic()
|
||||
if now - self._last_event_warning > 60:
|
||||
self.log.warning("Event queue size for cache at %s is %s",
|
||||
self.root, qsize)
|
||||
self._last_event_warning = now
|
||||
|
||||
try:
|
||||
self._handleCacheEvent(event)
|
||||
except Exception:
|
||||
self.log.exception("Error handling event %s:", event)
|
||||
|
||||
def _handleCacheEvent(self, event):
|
||||
# Ignore root node since we don't maintain a cached object for
|
||||
# it (all cached objects are under the root in our tree
|
||||
# caches).
|
||||
if event.path == self.root:
|
||||
return
|
||||
|
||||
# Start by assuming we need to fetch data for the event.
|
||||
fetch = True
|
||||
if event.type == EventType.NONE:
|
||||
if event.path is None:
|
||||
# We're probably being told of a connection change; ignore.
|
||||
return
|
||||
elif (event.type == EventType.DELETED):
|
||||
# If this is a normal deleted event, we don't need to
|
||||
# fetch anything.
|
||||
fetch = False
|
||||
|
||||
key = self.parsePath(event.path)
|
||||
if key is None:
|
||||
# The cache doesn't care about this path, so we don't need
|
||||
# to fetch.
|
||||
fetch = False
|
||||
|
||||
if fetch:
|
||||
future = self.zk.kazoo_client.get_async(event.path)
|
||||
else:
|
||||
future = None
|
||||
self._playback_queue.put((event, future, key))
|
||||
|
||||
def _playbackWorker(self):
|
||||
while not self._stopped:
|
||||
item = self._playback_queue.get()
|
||||
if item is None:
|
||||
continue
|
||||
|
||||
qsize = self._playback_queue.qsize()
|
||||
if qsize > self.qsize_warning_threshold:
|
||||
now = time.monotonic()
|
||||
if now - self._last_event_warning > 60:
|
||||
self.log.warning(
|
||||
"Playback queue size for cache at %s is %s",
|
||||
self.root, qsize)
|
||||
self._last_playback_warning = now
|
||||
|
||||
event, future, key = item
|
||||
try:
|
||||
data, stat = self.zk.kazoo_client.get(event.path)
|
||||
self._handlePlayback(event, future, key)
|
||||
except Exception:
|
||||
self.log.exception("Error playing back event %s:", event)
|
||||
|
||||
def _handlePlayback(self, event, future, key):
|
||||
self.event_log.debug("Cache playback event %s", event)
|
||||
exists = None
|
||||
data, stat = None, None
|
||||
|
||||
if future:
|
||||
try:
|
||||
data, stat = future.get()
|
||||
exists = True
|
||||
except kze.NoNodeError:
|
||||
exists = False
|
||||
elif (event.type in (EventType.CREATED, EventType.CHANGED)):
|
||||
|
||||
# We set "exists" above in case of cache re-initialization,
|
||||
# which happens out of sequence with the normal watch events.
|
||||
# and we can't be sure whether the node still exists or not by
|
||||
# the time we process it. Later cache watch events may
|
||||
# supercede this (for example, we may process a NONE event
|
||||
# here which we interpret as a delete which may be followed by
|
||||
# a normal delete event. That case, and any other variations
|
||||
# should be anticipated.
|
||||
|
||||
# If the event tells us whether the node exists, prefer that
|
||||
# value, otherwise fallback to what we determined above.
|
||||
if (event.type in (EventType.CREATED, EventType.CHANGED)):
|
||||
exists = True
|
||||
elif (event.type == EventType.DELETED):
|
||||
exists = False
|
||||
|
@ -872,28 +935,10 @@ class NodepoolTreeCache(abc.ABC):
|
|||
else:
|
||||
self._cached_paths.discard(event.path)
|
||||
|
||||
# Ignore root node since we don't maintain a cached object for
|
||||
# it (all cached objects are under the root in our tree
|
||||
# caches).
|
||||
if event.path == self.root:
|
||||
return
|
||||
|
||||
# Some caches have special handling for certain sub-objects
|
||||
if self.preCacheHook(event, exists):
|
||||
return
|
||||
|
||||
key = self.parsePath(event.path)
|
||||
if key is None:
|
||||
return
|
||||
|
||||
# If we didn't get the data above, and we expect the node to
|
||||
# exist, fetch the data now.
|
||||
if data is None and exists:
|
||||
try:
|
||||
data, stat = self.zk.kazoo_client.get(event.path)
|
||||
except kze.NoNodeError:
|
||||
pass
|
||||
|
||||
if data:
|
||||
data = self.zk._bytesToDict(data)
|
||||
|
||||
|
|
Loading…
Reference in New Issue