Browse Source

Merge "Add second level cache of nodes"

tags/3.4.0
Zuul 5 months ago
parent
commit
1aac1cc8d2
2 changed files with 80 additions and 26 deletions
  1. 19
    12
      nodepool/tests/unit/test_commands.py
  2. 61
    14
      nodepool/zk.py

+ 19
- 12
nodepool/tests/unit/test_commands.py View File

@@ -39,18 +39,25 @@ class TestNodepoolCMD(tests.DBTestCase):
39 39
     def assert_listed(self, configfile, cmd, col, val, count, col_count=0):
40 40
         log = logging.getLogger("tests.PrettyTableMock")
41 41
         self.patch_argv("-c", configfile, *cmd)
42
-        with mock.patch('prettytable.PrettyTable.add_row') as m_add_row:
43
-            nodepoolcmd.main()
44
-            rows_with_val = 0
45
-            # Find add_rows with the status were looking for
46
-            for args, kwargs in m_add_row.call_args_list:
47
-                row = args[0]
48
-                if col_count:
49
-                    self.assertEquals(len(row), col_count)
50
-                log.debug(row)
51
-                if row[col] == val:
52
-                    rows_with_val += 1
53
-            self.assertEquals(rows_with_val, count)
42
+        for _ in iterate_timeout(10, AssertionError, 'assert listed'):
43
+            try:
44
+                with mock.patch('prettytable.PrettyTable.add_row') as \
45
+                        m_add_row:
46
+                    nodepoolcmd.main()
47
+                    rows_with_val = 0
48
+                    # Find add_rows with the status were looking for
49
+                    for args, kwargs in m_add_row.call_args_list:
50
+                        row = args[0]
51
+                        if col_count:
52
+                            self.assertEquals(len(row), col_count)
53
+                        log.debug(row)
54
+                        if row[col] == val:
55
+                            rows_with_val += 1
56
+                    self.assertEquals(rows_with_val, count)
57
+                break
58
+            except AssertionError:
59
+                # retry
60
+                pass
54 61
 
55 62
     def assert_alien_images_listed(self, configfile, image_cnt, image_id):
56 63
         self.assert_listed(configfile, ['alien-image-list'], 2, image_id,

+ 61
- 14
nodepool/zk.py View File

@@ -22,7 +22,7 @@ from kazoo.client import KazooClient, KazooState
22 22
 from kazoo import exceptions as kze
23 23
 from kazoo.handlers.threading import KazooTimeoutError
24 24
 from kazoo.recipe.lock import Lock
25
-from kazoo.recipe.cache import TreeCache
25
+from kazoo.recipe.cache import TreeCache, TreeEvent
26 26
 
27 27
 from nodepool import exceptions as npe
28 28
 
@@ -702,6 +702,7 @@ class ZooKeeper(object):
702 702
         self._last_retry_log = 0
703 703
         self._node_cache = None
704 704
         self._request_cache = None
705
+        self._cached_nodes = {}
705 706
 
706 707
     # =======================================================================
707 708
     # Private Methods
@@ -893,6 +894,8 @@ class ZooKeeper(object):
893 894
                     self.logConnectionRetryEvent()
894 895
 
895 896
             self._node_cache = TreeCache(self.client, self.NODE_ROOT)
897
+            self._node_cache.listen_fault(self.cacheFaultListener)
898
+            self._node_cache.listen(self.nodeCacheListener)
896 899
             self._node_cache.start()
897 900
 
898 901
             self._request_cache = TreeCache(self.client, self.REQUEST_ROOT)
@@ -1780,25 +1783,21 @@ class ZooKeeper(object):
1780 1783
 
1781 1784
         :returns: The node data, or None if the node was not found.
1782 1785
         '''
1783
-        path = self._nodePath(node)
1784
-        data = None
1785
-        stat = None
1786 1786
         if cached:
1787
-            cached_data = self._node_cache.get_data(path)
1788
-            if cached_data:
1789
-                data = cached_data.data
1790
-                stat = cached_data.stat
1787
+            d = self._cached_nodes.get(node)
1788
+            if d:
1789
+                return d
1791 1790
 
1792
-        # If data is empty we either didn't use the cache or the cache didn't
1791
+        # We got here we either didn't use the cache or the cache didn't
1793 1792
         # have the node (yet). Note that even if we use caching we need to
1794 1793
         # do a real query if the cached data is empty because the node data
1795 1794
         # might not be in the cache yet when it's listed by the get_children
1796 1795
         # call.
1797
-        if not data:
1798
-            try:
1799
-                data, stat = self.client.get(path)
1800
-            except kze.NoNodeError:
1801
-                return None
1796
+        try:
1797
+            path = self._nodePath(node)
1798
+            data, stat = self.client.get(path)
1799
+        except kze.NoNodeError:
1800
+            return None
1802 1801
 
1803 1802
         if not data:
1804 1803
             return None
@@ -2060,3 +2059,51 @@ class ZooKeeper(object):
2060 2059
         '''
2061 2060
         for node in provider_nodes:
2062 2061
             self.deleteNode(node)
2062
+
2063
+    def cacheFaultListener(self, e):
2064
+        self.log.exception(e)
2065
+
2066
+    def nodeCacheListener(self, event):
2067
+
2068
+        if hasattr(event.event_data, 'path'):
2069
+            # Ignore root node
2070
+            path = event.event_data.path
2071
+            if path == self.NODE_ROOT:
2072
+                return
2073
+
2074
+            # Ignore lock nodes
2075
+            if '/lock' in path:
2076
+                return
2077
+
2078
+        # Ignore any non-node related events such as connection events here
2079
+        if event.event_type not in (TreeEvent.NODE_ADDED,
2080
+                                    TreeEvent.NODE_UPDATED,
2081
+                                    TreeEvent.NODE_REMOVED):
2082
+            return
2083
+
2084
+        path = event.event_data.path
2085
+        node_id = path.rsplit('/', 1)[1]
2086
+
2087
+        if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED):
2088
+            # Perform an in-place update of the already cached node if possible
2089
+            d = self._bytesToDict(event.event_data.data)
2090
+            old_node = self._cached_nodes.get(node_id)
2091
+            if old_node:
2092
+                if event.event_data.stat.version <= old_node.stat.version:
2093
+                    # Don't update to older data
2094
+                    return
2095
+                if old_node.lock:
2096
+                    # Don't update a locked node
2097
+                    return
2098
+                old_node.updateFromDict(d)
2099
+                old_node.stat = event.event_data.stat
2100
+            else:
2101
+                node = Node.fromDict(d, node_id)
2102
+                node.stat = event.event_data.stat
2103
+                self._cached_nodes[node_id] = node
2104
+        elif event.event_type == TreeEvent.NODE_REMOVED:
2105
+            try:
2106
+                del self._cached_nodes[node_id]
2107
+            except KeyError:
2108
+                # If it's already gone, don't care
2109
+                pass

Loading…
Cancel
Save