Browse Source

Merge "Asynchronously update node statistics"

tags/3.4.0
Zuul 5 months ago
parent
commit
81936cd818
5 changed files with 133 additions and 22 deletions
  1. 0
    1
      nodepool/driver/utils.py
  2. 81
    8
      nodepool/launcher.py
  3. 23
    13
      nodepool/stats.py
  4. 1
    0
      nodepool/tests/__init__.py
  5. 28
    0
      nodepool/zk.py

+ 0
- 1
nodepool/driver/utils.py View File

@@ -94,7 +94,6 @@ class NodeLauncher(threading.Thread,
94 94
         try:
95 95
             dt = int((time.monotonic() - start_time) * 1000)
96 96
             self.recordLaunchStats(statsd_key, dt)
97
-            self.updateNodeStats(self.zk, self.provider_config)
98 97
         except Exception:
99 98
             self.log.exception("Exception while reporting stats:")
100 99
 

+ 81
- 8
nodepool/launcher.py View File

@@ -46,13 +46,12 @@ LOCK_CLEANUP = 8 * HOURS
46 46
 SUSPEND_WAIT_TIME = 30
47 47
 
48 48
 
49
-class NodeDeleter(threading.Thread, stats.StatsReporter):
49
+class NodeDeleter(threading.Thread):
50 50
     log = logging.getLogger("nodepool.NodeDeleter")
51 51
 
52 52
     def __init__(self, zk, provider_manager, node):
53 53
         threading.Thread.__init__(self, name='NodeDeleter for %s %s' %
54 54
                                   (node.provider, node.external_id))
55
-        stats.StatsReporter.__init__(self)
56 55
         self._zk = zk
57 56
         self._provider_manager = provider_manager
58 57
         self._node = node
@@ -109,13 +108,8 @@ class NodeDeleter(threading.Thread, stats.StatsReporter):
109 108
 
110 109
         self.delete(self._zk, self._provider_manager, self._node, node_exists)
111 110
 
112
-        try:
113
-            self.updateNodeStats(self._zk, self._provider_manager.provider)
114
-        except Exception:
115
-            self.log.exception("Exception while reporting stats:")
116 111
 
117
-
118
-class PoolWorker(threading.Thread):
112
+class PoolWorker(threading.Thread, stats.StatsReporter):
119 113
     '''
120 114
     Class that manages node requests for a single provider pool.
121 115
 
@@ -143,6 +137,7 @@ class PoolWorker(threading.Thread):
143 137
         self.launcher_id = "%s-%s-%s" % (socket.gethostname(),
144 138
                                          os.getpid(),
145 139
                                          self.name)
140
+        stats.StatsReporter.__init__(self)
146 141
 
147 142
     # ---------------------------------------------------------------
148 143
     # Private methods
@@ -294,8 +289,12 @@ class PoolWorker(threading.Thread):
294 289
             launcher.id = self.launcher_id
295 290
             for prov_cfg in self.nodepool.config.providers.values():
296 291
                 launcher.supported_labels.update(prov_cfg.getSupportedLabels())
292
+            launcher.provider_name = self.provider_name
297 293
             self.zk.registerLauncher(launcher)
298 294
 
295
+            self.updateProviderLimits(
296
+                self.nodepool.config.providers.get(self.provider_name))
297
+
299 298
             try:
300 299
                 if not self.paused_handler:
301 300
                     self._assignHandlers()
@@ -699,6 +698,70 @@ class DeletedNodeWorker(BaseCleanupWorker):
699 698
             self.log.exception("Exception in DeletedNodeWorker:")
700 699
 
701 700
 
701
+class StatsWorker(BaseCleanupWorker, stats.StatsReporter):
702
+
703
+    def __init__(self, nodepool, interval):
704
+        super().__init__(nodepool, interval, name='StatsWorker')
705
+        self.log = logging.getLogger('nodepool.StatsWorker')
706
+        self.stats_event = threading.Event()
707
+        self.election = None
708
+
709
+    def stop(self):
710
+        self._running = False
711
+        if self.election is not None:
712
+            self.log.debug('Cancel leader election')
713
+            self.election.cancel()
714
+        self.stats_event.set()
715
+        super().stop()
716
+
717
+    def _run(self):
718
+        try:
719
+            stats.StatsReporter.__init__(self)
720
+
721
+            if not self._statsd:
722
+                return
723
+
724
+            if self.election is None:
725
+                zk = self._nodepool.getZK()
726
+                identifier = "%s-%s" % (socket.gethostname(), os.getpid())
727
+                self.election = zk.getStatsElection(identifier)
728
+
729
+            if not self._running:
730
+                return
731
+
732
+            self.election.run(self._run_stats)
733
+
734
+        except Exception:
735
+            self.log.exception('Exception in StatsWorker:')
736
+
737
+    def _run_stats(self):
738
+        self.log.info('Won stats reporter election')
739
+
740
+        # enable us getting events
741
+        zk = self._nodepool.getZK()
742
+        zk.setNodeStatsEvent(self.stats_event)
743
+
744
+        while self._running:
745
+            signaled = self.stats_event.wait()
746
+
747
+            if not self._running:
748
+                break
749
+
750
+            if not signaled:
751
+                continue
752
+
753
+            self.log.debug('Updating stats')
754
+            self.stats_event.clear()
755
+            try:
756
+                self.updateNodeStats(zk)
757
+            except Exception:
758
+                self.log.exception("Exception while reporting stats:")
759
+            time.sleep(1)
760
+
761
+        # Unregister from node stats events
762
+        zk.setNodeStatsEvent(None)
763
+
764
+
702 765
 class NodePool(threading.Thread):
703 766
     log = logging.getLogger("nodepool.NodePool")
704 767
 
@@ -710,6 +773,7 @@ class NodePool(threading.Thread):
710 773
         self.watermark_sleep = watermark_sleep
711 774
         self.cleanup_interval = 60
712 775
         self.delete_interval = 5
776
+        self.stats_interval = 5
713 777
         self._stopped = False
714 778
         self._stop_event = threading.Event()
715 779
         self.config = None
@@ -718,6 +782,7 @@ class NodePool(threading.Thread):
718 782
         self._pool_threads = {}
719 783
         self._cleanup_thread = None
720 784
         self._delete_thread = None
785
+        self._stats_thread = None
721 786
         self._submittedRequests = {}
722 787
 
723 788
     def stop(self):
@@ -738,6 +803,10 @@ class NodePool(threading.Thread):
738 803
             self._delete_thread.stop()
739 804
             self._delete_thread.join()
740 805
 
806
+        if self._stats_thread:
807
+            self._stats_thread.stop()
808
+            self._stats_thread.join()
809
+
741 810
         # Don't let stop() return until all pool threads have been
742 811
         # terminated.
743 812
         self.log.debug("Stopping pool threads")
@@ -950,6 +1019,10 @@ class NodePool(threading.Thread):
950 1019
                         self, self.delete_interval)
951 1020
                     self._delete_thread.start()
952 1021
 
1022
+                if not self._stats_thread:
1023
+                    self._stats_thread = StatsWorker(self, self.stats_interval)
1024
+                    self._stats_thread.start()
1025
+
953 1026
                 # Stop any PoolWorker threads if the pool was removed
954 1027
                 # from the config.
955 1028
                 pool_keys = set()

+ 23
- 13
nodepool/stats.py View File

@@ -85,39 +85,40 @@ class StatsReporter(object):
85 85
             pipeline.incr(key)
86 86
         pipeline.send()
87 87
 
88
-    def updateNodeStats(self, zk_conn, provider):
88
+    def updateNodeStats(self, zk_conn):
89 89
         '''
90 90
         Refresh statistics for all known nodes.
91 91
 
92 92
         :param ZooKeeper zk_conn: A ZooKeeper connection object.
93
-        :param Provider provider: A config Provider object.
94 93
         '''
95 94
         if not self._statsd:
96 95
             return
97 96
 
98 97
         states = {}
99 98
 
99
+        launchers = zk_conn.getRegisteredLaunchers()
100
+        labels = set()
101
+        for launcher in launchers:
102
+            labels.update(launcher.supported_labels)
103
+        providers = set()
104
+        for launcher in launchers:
105
+            providers.add(launcher.provider_name)
106
+
100 107
         # Initialize things we know about to zero
101 108
         for state in zk.Node.VALID_STATES:
102 109
             key = 'nodepool.nodes.%s' % state
103 110
             states[key] = 0
104
-            key = 'nodepool.provider.%s.nodes.%s' % (provider.name, state)
105
-            states[key] = 0
111
+            for provider in providers:
112
+                key = 'nodepool.provider.%s.nodes.%s' % (provider, state)
113
+                states[key] = 0
106 114
 
107 115
         # Initialize label stats to 0
108
-        for label in provider.getSupportedLabels():
116
+        for label in labels:
109 117
             for state in zk.Node.VALID_STATES:
110 118
                 key = 'nodepool.label.%s.nodes.%s' % (label, state)
111 119
                 states[key] = 0
112 120
 
113
-        # Note that we intentionally don't use caching here because we don't
114
-        # know when the next update will happen and thus need to report the
115
-        # correct most recent state. Otherwise we can end up in reporting
116
-        # a gauge with a node in state deleting = 1 and never update this for
117
-        # a long time.
118
-        # TODO(tobiash): Changing updateNodeStats to just run periodically will
119
-        # resolve this and we can operate on cached data.
120
-        for node in zk_conn.nodeIterator(cached=False):
121
+        for node in zk_conn.nodeIterator():
121 122
             # nodepool.nodes.STATE
122 123
             key = 'nodepool.nodes.%s' % node.state
123 124
             states[key] += 1
@@ -145,9 +146,18 @@ class StatsReporter(object):
145 146
         for key, count in states.items():
146 147
             pipeline.gauge(key, count)
147 148
 
149
+        pipeline.send()
150
+
151
+    def updateProviderLimits(self, provider):
152
+        if not self._statsd:
153
+            return
154
+
155
+        pipeline = self._statsd.pipeline()
156
+
148 157
         # nodepool.provider.PROVIDER.max_servers
149 158
         key = 'nodepool.provider.%s.max_servers' % provider.name
150 159
         max_servers = sum([p.max_servers for p in provider.pools.values()
151 160
                            if p.max_servers])
152 161
         pipeline.gauge(key, max_servers)
162
+
153 163
         pipeline.send()

+ 1
- 0
nodepool/tests/__init__.py View File

@@ -207,6 +207,7 @@ class BaseTestCase(testtools.TestCase):
207 207
                      'fake-provider3',
208 208
                      'CleanupWorker',
209 209
                      'DeletedNodeWorker',
210
+                     'StatsWorker',
210 211
                      'pydevd.CommandThread',
211 212
                      'pydevd.Reader',
212 213
                      'pydevd.Writer',

+ 28
- 0
nodepool/zk.py View File

@@ -23,6 +23,7 @@ from kazoo import exceptions as kze
23 23
 from kazoo.handlers.threading import KazooTimeoutError
24 24
 from kazoo.recipe.lock import Lock
25 25
 from kazoo.recipe.cache import TreeCache, TreeEvent
26
+from kazoo.recipe.election import Election
26 27
 
27 28
 from nodepool import exceptions as npe
28 29
 
@@ -164,6 +165,7 @@ class Launcher(Serializable):
164 165
 
165 166
     def __init__(self):
166 167
         self.id = None
168
+        self.provider_name = None
167 169
         self._supported_labels = set()
168 170
 
169 171
     def __eq__(self, other):
@@ -186,6 +188,7 @@ class Launcher(Serializable):
186 188
     def toDict(self):
187 189
         d = {}
188 190
         d['id'] = self.id
191
+        d['provider_name'] = self.provider_name
189 192
         # sets are not JSON serializable, so use a sorted list
190 193
         d['supported_labels'] = sorted(self.supported_labels)
191 194
         return d
@@ -194,6 +197,10 @@ class Launcher(Serializable):
194 197
     def fromDict(d):
195 198
         obj = Launcher()
196 199
         obj.id = d.get('id')
200
+        # TODO(tobiash): The fallback to 'unknown' is only needed to avoid
201
+        #                having a full nodepool shutdown on upgrade. It can be
202
+        #                removed later.
203
+        obj.provider_name = d.get('provider_name', 'unknown')
197 204
         obj.supported_labels = set(d.get('supported_labels', []))
198 205
         return obj
199 206
 
@@ -693,6 +700,7 @@ class ZooKeeper(object):
693 700
     NODE_ROOT = "/nodepool/nodes"
694 701
     REQUEST_ROOT = "/nodepool/requests"
695 702
     REQUEST_LOCK_ROOT = "/nodepool/requests-lock"
703
+    ELECTION_ROOT = "/nodepool/elections"
696 704
 
697 705
     # Log zookeeper retry every 10 seconds
698 706
     retry_log_rate = 10
@@ -710,10 +718,15 @@ class ZooKeeper(object):
710 718
         self._cached_node_requests = {}
711 719
         self.enable_cache = enable_cache
712 720
 
721
+        self.node_stats_event = None
722
+
713 723
     # =======================================================================
714 724
     # Private Methods
715 725
     # =======================================================================
716 726
 
727
+    def _electionPath(self, election):
728
+        return "%s/%s" % (self.ELECTION_ROOT, election)
729
+
717 730
     def _imagePath(self, image):
718 731
         return "%s/%s" % (self.IMAGE_ROOT, image)
719 732
 
@@ -2106,6 +2119,10 @@ class ZooKeeper(object):
2106 2119
                 node = Node.fromDict(d, node_id)
2107 2120
                 node.stat = event.event_data.stat
2108 2121
                 self._cached_nodes[node_id] = node
2122
+
2123
+            # set the stats event so the stats reporting thread can act upon it
2124
+            if self.node_stats_event is not None:
2125
+                self.node_stats_event.set()
2109 2126
         elif event.event_type == TreeEvent.NODE_REMOVED:
2110 2127
             try:
2111 2128
                 del self._cached_nodes[node_id]
@@ -2113,6 +2130,13 @@ class ZooKeeper(object):
2113 2130
                 # If it's already gone, don't care
2114 2131
                 pass
2115 2132
 
2133
+            # set the stats event so the stats reporting thread can act upon it
2134
+            if self.node_stats_event is not None:
2135
+                self.node_stats_event.set()
2136
+
2137
+    def setNodeStatsEvent(self, event):
2138
+        self.node_stats_event = event
2139
+
2116 2140
     def requestCacheListener(self, event):
2117 2141
 
2118 2142
         if hasattr(event.event_data, 'path'):
@@ -2158,3 +2182,7 @@ class ZooKeeper(object):
2158 2182
             except KeyError:
2159 2183
                 # If it's already gone, don't care
2160 2184
                 pass
2185
+
2186
+    def getStatsElection(self, identifier):
2187
+        path = self._electionPath('stats')
2188
+        return Election(self.client, path, identifier)

Loading…
Cancel
Save