Browse Source

Poll OpenStack resources in background

The collectd plugin spawns threads responsible for polling APIs.

Change-Id: I6d32862835fc86ac81c6e15fcde83cc113627a88
Swann Croiset 2 years ago
parent
commit
4fa3fe8cbe

+ 45
- 0
deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_base.py View File

@@ -17,6 +17,7 @@ from functools import wraps
17 17
 import json
18 18
 import signal
19 19
 import subprocess
20
+import threading
20 21
 import time
21 22
 import traceback
22 23
 
@@ -271,3 +272,47 @@ class CephBase(Base):
271 272
             if node.key == "Cluster":
272 273
                 self.cluster = node.values[0]
273 274
         self.plugin_instance = self.cluster
275
+
276
+
277
+class AsyncPoller(threading.Thread):
278
+    """Execute an independant thread to execute a function periodically
279
+
280
+       Args:
281
+           collectd: used for logging
282
+           polling_function: a function to execute periodically
283
+           interval: the interval in second
284
+           name: (optional) the name of the thread
285
+    """
286
+
287
+    def __init__(self, collectd, polling_function, interval, name=None):
288
+        super(AsyncPoller, self).__init__(name=name)
289
+        self.collectd = collectd
290
+        self.polling_function = polling_function
291
+        self.interval = interval
292
+        self._results = None
293
+
294
+    def run(self):
295
+        self.collectd.info('Starting thread {}'.format(self.name))
296
+        while True:
297
+            try:
298
+                started_at = time.time()
299
+
300
+                self._results = self.polling_function()
301
+
302
+                tosleep = self.interval - (time.time() - started_at)
303
+                if tosleep > 0:
304
+                    time.sleep(tosleep)
305
+                else:
306
+                    self.collectd.warning(
307
+                        'Polling took more than {}s for {}'.format(
308
+                            self.interval, self.name
309
+                        )
310
+                    )
311
+
312
+            except Exception as e:
313
+                self._results = None
314
+                self.collectd.error('{} fails: {}'.format(self.name, e))
315
+                time.sleep(10)
316
+
317
+    def get_results(self):
318
+        return self._results

+ 67
- 34
deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_openstack.py View File

@@ -163,7 +163,9 @@ class CollectdPlugin(base.Base):
163 163
         self.max_retries = 2
164 164
         self.os_client = None
165 165
         self.extra_config = {}
166
+        self._threads = {}
166 167
         self.pagination_limit = None
168
+        self.polling_interval = 60
167 169
 
168 170
     def _build_url(self, service, resource):
169 171
         s = (self.get_service(service) or {})
@@ -278,6 +280,9 @@ class CollectdPlugin(base.Base):
278 280
                 keystone_url = node.values[0]
279 281
             elif node.key == 'PaginationLimit':
280 282
                 self.pagination_limit = int(node.values[0])
283
+            elif node.key == 'PollingInterval':
284
+                self.polling_interval = int(node.values[0])
285
+
281 286
         self.os_client = OSClient(username, password, tenant_name,
282 287
                                   keystone_url, self.timeout, self.logger,
283 288
                                   self.max_retries)
@@ -302,46 +307,74 @@ class CollectdPlugin(base.Base):
302 307
         if detail:
303 308
             resource = '{}/detail'.format(resource)
304 309
 
305
-        url = self._build_url(project, resource)
306
-        if not url:
307
-            return
308
-
309 310
         opts = {}
310 311
         if self.pagination_limit:
311 312
             opts['limit'] = self.pagination_limit
312 313
 
313 314
         opts.update(params)
314
-        objs = []
315
-
316
-        while True:
317
-            r = self.os_client.make_request('get', url, params=opts)
318
-            if not r or object_name not in r.json():
319
-                self.logger.warning('Could not find %s %s' % (project,
320
-                                                              object_name))
321
-                return objs
322
-
323
-            resp = r.json()
324
-            bulk_objs = resp.get(object_name)
325 315
 
326
-            if not bulk_objs:
327
-                break
328
-
329
-            objs.extend(bulk_objs)
330
-
331
-            links = resp.get('{}_links'.format(object_name))
332
-            if links is None or self.pagination_limit is None:
333
-                # Either the pagination is not supported or there is no more
334
-                # data
335
-                break
336
-
337
-            # if there is no 'next' link in the response, all data has been
338
-            # read.
339
-            if len([i for i in links if i.get('rel') == 'next']) == 0:
340
-                break
341
-
342
-            opts['marker'] = bulk_objs[-1]['id']
343
-
344
-        return objs
316
+        def openstack_api_poller():
317
+            _objects = []
318
+            _opts = {}
319
+            _opts.update(opts)
320
+            while True:
321
+                r = self.get(project, resource, params=_opts)
322
+                if not r or object_name not in r.json():
323
+                    if r is None:
324
+                        err = ''
325
+                    else:
326
+                        err = r.text
327
+                    self.collectd.warning('Could not find {}: {} {}'.format(
328
+                        project, object_name, err
329
+                    ))
330
+                    # Avoid to provide incomplete data by reseting current
331
+                    # set.
332
+                    _objects = []
333
+                    break
334
+
335
+                resp = r.json()
336
+                bulk_objs = resp.get(object_name)
337
+                if not bulk_objs:
338
+                    # emtpy list
339
+                    break
340
+
341
+                _objects.extend(bulk_objs)
342
+
343
+                links = resp.get('{}_links'.format(object_name))
344
+                if links is None or self.pagination_limit is None:
345
+                    # Either the pagination is not supported or there is
346
+                    # no more data
347
+                    # In both cases, we got at this stage all the data we
348
+                    # can have.
349
+                    break
350
+
351
+                # if there is no 'next' link in the response, all data has
352
+                # been read.
353
+                if len([i for i in links if i.get('rel') == 'next']) == 0:
354
+                    break
355
+
356
+                _opts['marker'] = bulk_objs[-1]['id']
357
+
358
+            return _objects
359
+
360
+        poller_id = '{}:{}'.format(project, resource)
361
+        if poller_id not in self._threads:
362
+            t = base.AsyncPoller(self.collectd,
363
+                                 openstack_api_poller,
364
+                                 self.polling_interval,
365
+                                 poller_id)
366
+            t.start()
367
+            self._threads[poller_id] = t
368
+
369
+        t = self._threads[poller_id]
370
+        if not t.is_alive():
371
+            self.logger.warning("Unexpected end of the thread {}".format(
372
+                t.name))
373
+            del self._threads[poller_id]
374
+            return []
375
+
376
+        results = t.get_results()
377
+        return [] if results is None else results
345 378
 
346 379
     def count_objects_group_by(self,
347 380
                                list_object,

Loading…
Cancel
Save