Browse Source

Asynchronously update node statistics

We currently updarte the node statistics on every node launch or
delete. This cannot use caching at the moment because when the
statistics are updated we might end up pushing slightly outdated
data. If then there is no further update for a longer time we end up
with broken gauges. We already get update events from the node cache
so we can use that to centrally trigger node statistics updates.

This is combined with leader election so there is only a single
launcher that keeps the statistics up to date. This will ensure that
the statistics are not cluttered because of several launchers pushing
their own slightly different view into the stats.

As a side effect this reduces the runtime of a test that creates 200
nodes from 100s to 70s on my local machine.

Change-Id: I77c6edc1db45b5b45be1812cf19eea66fdfab014
tags/3.4.0
Tobias Henkel 6 months ago
parent
commit
64487baef0
No account linked to committer's email address
5 changed files with 133 additions and 22 deletions
  1. 0
    1
      nodepool/driver/utils.py
  2. 81
    8
      nodepool/launcher.py
  3. 23
    13
      nodepool/stats.py
  4. 1
    0
      nodepool/tests/__init__.py
  5. 28
    0
      nodepool/zk.py

+ 0
- 1
nodepool/driver/utils.py View File

@@ -94,7 +94,6 @@ class NodeLauncher(threading.Thread,
94 94
         try:
95 95
             dt = int((time.monotonic() - start_time) * 1000)
96 96
             self.recordLaunchStats(statsd_key, dt)
97
-            self.updateNodeStats(self.zk, self.provider_config)
98 97
         except Exception:
99 98
             self.log.exception("Exception while reporting stats:")
100 99
 

+ 81
- 8
nodepool/launcher.py View File

@@ -46,13 +46,12 @@ LOCK_CLEANUP = 8 * HOURS
46 46
 SUSPEND_WAIT_TIME = 30
47 47
 
48 48
 
49
-class NodeDeleter(threading.Thread, stats.StatsReporter):
49
+class NodeDeleter(threading.Thread):
50 50
     log = logging.getLogger("nodepool.NodeDeleter")
51 51
 
52 52
     def __init__(self, zk, provider_manager, node):
53 53
         threading.Thread.__init__(self, name='NodeDeleter for %s %s' %
54 54
                                   (node.provider, node.external_id))
55
-        stats.StatsReporter.__init__(self)
56 55
         self._zk = zk
57 56
         self._provider_manager = provider_manager
58 57
         self._node = node
@@ -109,13 +108,8 @@ class NodeDeleter(threading.Thread, stats.StatsReporter):
109 108
 
110 109
         self.delete(self._zk, self._provider_manager, self._node, node_exists)
111 110
 
112
-        try:
113
-            self.updateNodeStats(self._zk, self._provider_manager.provider)
114
-        except Exception:
115
-            self.log.exception("Exception while reporting stats:")
116 111
 
117
-
118
-class PoolWorker(threading.Thread):
112
+class PoolWorker(threading.Thread, stats.StatsReporter):
119 113
     '''
120 114
     Class that manages node requests for a single provider pool.
121 115
 
@@ -143,6 +137,7 @@ class PoolWorker(threading.Thread):
143 137
         self.launcher_id = "%s-%s-%s" % (socket.gethostname(),
144 138
                                          os.getpid(),
145 139
                                          self.name)
140
+        stats.StatsReporter.__init__(self)
146 141
 
147 142
     # ---------------------------------------------------------------
148 143
     # Private methods
@@ -294,8 +289,12 @@ class PoolWorker(threading.Thread):
294 289
             launcher.id = self.launcher_id
295 290
             for prov_cfg in self.nodepool.config.providers.values():
296 291
                 launcher.supported_labels.update(prov_cfg.getSupportedLabels())
292
+            launcher.provider_name = self.provider_name
297 293
             self.zk.registerLauncher(launcher)
298 294
 
295
+            self.updateProviderLimits(
296
+                self.nodepool.config.providers.get(self.provider_name))
297
+
299 298
             try:
300 299
                 if not self.paused_handler:
301 300
                     self._assignHandlers()
@@ -699,6 +698,70 @@ class DeletedNodeWorker(BaseCleanupWorker):
699 698
             self.log.exception("Exception in DeletedNodeWorker:")
700 699
 
701 700
 
701
+class StatsWorker(BaseCleanupWorker, stats.StatsReporter):
702
+
703
+    def __init__(self, nodepool, interval):
704
+        super().__init__(nodepool, interval, name='StatsWorker')
705
+        self.log = logging.getLogger('nodepool.StatsWorker')
706
+        self.stats_event = threading.Event()
707
+        self.election = None
708
+
709
+    def stop(self):
710
+        self._running = False
711
+        if self.election is not None:
712
+            self.log.debug('Cancel leader election')
713
+            self.election.cancel()
714
+        self.stats_event.set()
715
+        super().stop()
716
+
717
+    def _run(self):
718
+        try:
719
+            stats.StatsReporter.__init__(self)
720
+
721
+            if not self._statsd:
722
+                return
723
+
724
+            if self.election is None:
725
+                zk = self._nodepool.getZK()
726
+                identifier = "%s-%s" % (socket.gethostname(), os.getpid())
727
+                self.election = zk.getStatsElection(identifier)
728
+
729
+            if not self._running:
730
+                return
731
+
732
+            self.election.run(self._run_stats)
733
+
734
+        except Exception:
735
+            self.log.exception('Exception in StatsWorker:')
736
+
737
+    def _run_stats(self):
738
+        self.log.info('Won stats reporter election')
739
+
740
+        # enable us getting events
741
+        zk = self._nodepool.getZK()
742
+        zk.setNodeStatsEvent(self.stats_event)
743
+
744
+        while self._running:
745
+            signaled = self.stats_event.wait()
746
+
747
+            if not self._running:
748
+                break
749
+
750
+            if not signaled:
751
+                continue
752
+
753
+            self.log.debug('Updating stats')
754
+            self.stats_event.clear()
755
+            try:
756
+                self.updateNodeStats(zk)
757
+            except Exception:
758
+                self.log.exception("Exception while reporting stats:")
759
+            time.sleep(1)
760
+
761
+        # Unregister from node stats events
762
+        zk.setNodeStatsEvent(None)
763
+
764
+
702 765
 class NodePool(threading.Thread):
703 766
     log = logging.getLogger("nodepool.NodePool")
704 767
 
@@ -710,6 +773,7 @@ class NodePool(threading.Thread):
710 773
         self.watermark_sleep = watermark_sleep
711 774
         self.cleanup_interval = 60
712 775
         self.delete_interval = 5
776
+        self.stats_interval = 5
713 777
         self._stopped = False
714 778
         self._stop_event = threading.Event()
715 779
         self.config = None
@@ -718,6 +782,7 @@ class NodePool(threading.Thread):
718 782
         self._pool_threads = {}
719 783
         self._cleanup_thread = None
720 784
         self._delete_thread = None
785
+        self._stats_thread = None
721 786
         self._submittedRequests = {}
722 787
 
723 788
     def stop(self):
@@ -738,6 +803,10 @@ class NodePool(threading.Thread):
738 803
             self._delete_thread.stop()
739 804
             self._delete_thread.join()
740 805
 
806
+        if self._stats_thread:
807
+            self._stats_thread.stop()
808
+            self._stats_thread.join()
809
+
741 810
         # Don't let stop() return until all pool threads have been
742 811
         # terminated.
743 812
         self.log.debug("Stopping pool threads")
@@ -950,6 +1019,10 @@ class NodePool(threading.Thread):
950 1019
                         self, self.delete_interval)
951 1020
                     self._delete_thread.start()
952 1021
 
1022
+                if not self._stats_thread:
1023
+                    self._stats_thread = StatsWorker(self, self.stats_interval)
1024
+                    self._stats_thread.start()
1025
+
953 1026
                 # Stop any PoolWorker threads if the pool was removed
954 1027
                 # from the config.
955 1028
                 pool_keys = set()

+ 23
- 13
nodepool/stats.py View File

@@ -85,39 +85,40 @@ class StatsReporter(object):
85 85
             pipeline.incr(key)
86 86
         pipeline.send()
87 87
 
88
-    def updateNodeStats(self, zk_conn, provider):
88
+    def updateNodeStats(self, zk_conn):
89 89
         '''
90 90
         Refresh statistics for all known nodes.
91 91
 
92 92
         :param ZooKeeper zk_conn: A ZooKeeper connection object.
93
-        :param Provider provider: A config Provider object.
94 93
         '''
95 94
         if not self._statsd:
96 95
             return
97 96
 
98 97
         states = {}
99 98
 
99
+        launchers = zk_conn.getRegisteredLaunchers()
100
+        labels = set()
101
+        for launcher in launchers:
102
+            labels.update(launcher.supported_labels)
103
+        providers = set()
104
+        for launcher in launchers:
105
+            providers.add(launcher.provider_name)
106
+
100 107
         # Initialize things we know about to zero
101 108
         for state in zk.Node.VALID_STATES:
102 109
             key = 'nodepool.nodes.%s' % state
103 110
             states[key] = 0
104
-            key = 'nodepool.provider.%s.nodes.%s' % (provider.name, state)
105
-            states[key] = 0
111
+            for provider in providers:
112
+                key = 'nodepool.provider.%s.nodes.%s' % (provider, state)
113
+                states[key] = 0
106 114
 
107 115
         # Initialize label stats to 0
108
-        for label in provider.getSupportedLabels():
116
+        for label in labels:
109 117
             for state in zk.Node.VALID_STATES:
110 118
                 key = 'nodepool.label.%s.nodes.%s' % (label, state)
111 119
                 states[key] = 0
112 120
 
113
-        # Note that we intentionally don't use caching here because we don't
114
-        # know when the next update will happen and thus need to report the
115
-        # correct most recent state. Otherwise we can end up in reporting
116
-        # a gauge with a node in state deleting = 1 and never update this for
117
-        # a long time.
118
-        # TODO(tobiash): Changing updateNodeStats to just run periodically will
119
-        # resolve this and we can operate on cached data.
120
-        for node in zk_conn.nodeIterator(cached=False):
121
+        for node in zk_conn.nodeIterator():
121 122
             # nodepool.nodes.STATE
122 123
             key = 'nodepool.nodes.%s' % node.state
123 124
             states[key] += 1
@@ -145,9 +146,18 @@ class StatsReporter(object):
145 146
         for key, count in states.items():
146 147
             pipeline.gauge(key, count)
147 148
 
149
+        pipeline.send()
150
+
151
+    def updateProviderLimits(self, provider):
152
+        if not self._statsd:
153
+            return
154
+
155
+        pipeline = self._statsd.pipeline()
156
+
148 157
         # nodepool.provider.PROVIDER.max_servers
149 158
         key = 'nodepool.provider.%s.max_servers' % provider.name
150 159
         max_servers = sum([p.max_servers for p in provider.pools.values()
151 160
                            if p.max_servers])
152 161
         pipeline.gauge(key, max_servers)
162
+
153 163
         pipeline.send()

+ 1
- 0
nodepool/tests/__init__.py View File

@@ -207,6 +207,7 @@ class BaseTestCase(testtools.TestCase):
207 207
                      'fake-provider3',
208 208
                      'CleanupWorker',
209 209
                      'DeletedNodeWorker',
210
+                     'StatsWorker',
210 211
                      'pydevd.CommandThread',
211 212
                      'pydevd.Reader',
212 213
                      'pydevd.Writer',

+ 28
- 0
nodepool/zk.py View File

@@ -23,6 +23,7 @@ from kazoo import exceptions as kze
23 23
 from kazoo.handlers.threading import KazooTimeoutError
24 24
 from kazoo.recipe.lock import Lock
25 25
 from kazoo.recipe.cache import TreeCache, TreeEvent
26
+from kazoo.recipe.election import Election
26 27
 
27 28
 from nodepool import exceptions as npe
28 29
 
@@ -164,6 +165,7 @@ class Launcher(Serializable):
164 165
 
165 166
     def __init__(self):
166 167
         self.id = None
168
+        self.provider_name = None
167 169
         self._supported_labels = set()
168 170
 
169 171
     def __eq__(self, other):
@@ -186,6 +188,7 @@ class Launcher(Serializable):
186 188
     def toDict(self):
187 189
         d = {}
188 190
         d['id'] = self.id
191
+        d['provider_name'] = self.provider_name
189 192
         # sets are not JSON serializable, so use a sorted list
190 193
         d['supported_labels'] = sorted(self.supported_labels)
191 194
         return d
@@ -194,6 +197,10 @@ class Launcher(Serializable):
194 197
     def fromDict(d):
195 198
         obj = Launcher()
196 199
         obj.id = d.get('id')
200
+        # TODO(tobiash): The fallback to 'unknown' is only needed to avoid
201
+        #                having a full nodepool shutdown on upgrade. It can be
202
+        #                removed later.
203
+        obj.provider_name = d.get('provider_name', 'unknown')
197 204
         obj.supported_labels = set(d.get('supported_labels', []))
198 205
         return obj
199 206
 
@@ -689,6 +696,7 @@ class ZooKeeper(object):
689 696
     NODE_ROOT = "/nodepool/nodes"
690 697
     REQUEST_ROOT = "/nodepool/requests"
691 698
     REQUEST_LOCK_ROOT = "/nodepool/requests-lock"
699
+    ELECTION_ROOT = "/nodepool/elections"
692 700
 
693 701
     # Log zookeeper retry every 10 seconds
694 702
     retry_log_rate = 10
@@ -706,10 +714,15 @@ class ZooKeeper(object):
706 714
         self._cached_node_requests = {}
707 715
         self.enable_cache = enable_cache
708 716
 
717
+        self.node_stats_event = None
718
+
709 719
     # =======================================================================
710 720
     # Private Methods
711 721
     # =======================================================================
712 722
 
723
+    def _electionPath(self, election):
724
+        return "%s/%s" % (self.ELECTION_ROOT, election)
725
+
713 726
     def _imagePath(self, image):
714 727
         return "%s/%s" % (self.IMAGE_ROOT, image)
715 728
 
@@ -2102,6 +2115,10 @@ class ZooKeeper(object):
2102 2115
                 node = Node.fromDict(d, node_id)
2103 2116
                 node.stat = event.event_data.stat
2104 2117
                 self._cached_nodes[node_id] = node
2118
+
2119
+            # set the stats event so the stats reporting thread can act upon it
2120
+            if self.node_stats_event is not None:
2121
+                self.node_stats_event.set()
2105 2122
         elif event.event_type == TreeEvent.NODE_REMOVED:
2106 2123
             try:
2107 2124
                 del self._cached_nodes[node_id]
@@ -2109,6 +2126,13 @@ class ZooKeeper(object):
2109 2126
                 # If it's already gone, don't care
2110 2127
                 pass
2111 2128
 
2129
+            # set the stats event so the stats reporting thread can act upon it
2130
+            if self.node_stats_event is not None:
2131
+                self.node_stats_event.set()
2132
+
2133
+    def setNodeStatsEvent(self, event):
2134
+        self.node_stats_event = event
2135
+
2112 2136
     def requestCacheListener(self, event):
2113 2137
 
2114 2138
         if hasattr(event.event_data, 'path'):
@@ -2154,3 +2178,7 @@ class ZooKeeper(object):
2154 2178
             except KeyError:
2155 2179
                 # If it's already gone, don't care
2156 2180
                 pass
2181
+
2182
+    def getStatsElection(self, identifier):
2183
+        path = self._electionPath('stats')
2184
+        return Election(self.client, path, identifier)

Loading…
Cancel
Save