Merge "Poll OpenStack resources in background"

This commit is contained in:
Jenkins 2017-02-09 12:37:57 +00:00 committed by Gerrit Code Review
commit e805c9fe58
2 changed files with 105 additions and 27 deletions

View File

@ -17,6 +17,7 @@ from functools import wraps
import json import json
import signal import signal
import subprocess import subprocess
import threading
import time import time
import traceback import traceback
@ -271,3 +272,47 @@ class CephBase(Base):
if node.key == "Cluster": if node.key == "Cluster":
self.cluster = node.values[0] self.cluster = node.values[0]
self.plugin_instance = self.cluster self.plugin_instance = self.cluster
class AsyncPoller(threading.Thread):
"""Execute an independant thread to execute a function periodically
Args:
collectd: used for logging
polling_function: a function to execute periodically
interval: the interval in second
name: (optional) the name of the thread
"""
def __init__(self, collectd, polling_function, interval, name=None):
super(AsyncPoller, self).__init__(name=name)
self.collectd = collectd
self.polling_function = polling_function
self.interval = interval
self._results = None
def run(self):
self.collectd.info('Starting thread {}'.format(self.name))
while True:
try:
started_at = time.time()
self._results = self.polling_function()
tosleep = self.interval - (time.time() - started_at)
if tosleep > 0:
time.sleep(tosleep)
else:
self.collectd.warning(
'Polling took more than {}s for {}'.format(
self.interval, self.name
)
)
except Exception as e:
self._results = None
self.collectd.error('{} fails: {}'.format(self.name, e))
time.sleep(10)
def get_results(self):
return self._results

View File

@ -163,7 +163,9 @@ class CollectdPlugin(base.Base):
self.max_retries = 2 self.max_retries = 2
self.os_client = None self.os_client = None
self.extra_config = {} self.extra_config = {}
self._threads = {}
self.pagination_limit = None self.pagination_limit = None
self.polling_interval = 60
def _build_url(self, service, resource): def _build_url(self, service, resource):
s = (self.get_service(service) or {}) s = (self.get_service(service) or {})
@ -278,6 +280,9 @@ class CollectdPlugin(base.Base):
keystone_url = node.values[0] keystone_url = node.values[0]
elif node.key == 'PaginationLimit': elif node.key == 'PaginationLimit':
self.pagination_limit = int(node.values[0]) self.pagination_limit = int(node.values[0])
elif node.key == 'PollingInterval':
self.polling_interval = int(node.values[0])
self.os_client = OSClient(username, password, tenant_name, self.os_client = OSClient(username, password, tenant_name,
keystone_url, self.timeout, self.logger, keystone_url, self.timeout, self.logger,
self.max_retries) self.max_retries)
@ -302,46 +307,74 @@ class CollectdPlugin(base.Base):
if detail: if detail:
resource = '{}/detail'.format(resource) resource = '{}/detail'.format(resource)
url = self._build_url(project, resource)
if not url:
return
opts = {} opts = {}
if self.pagination_limit: if self.pagination_limit:
opts['limit'] = self.pagination_limit opts['limit'] = self.pagination_limit
opts.update(params) opts.update(params)
objs = []
while True: def openstack_api_poller():
r = self.os_client.make_request('get', url, params=opts) _objects = []
if not r or object_name not in r.json(): _opts = {}
self.logger.warning('Could not find %s %s' % (project, _opts.update(opts)
object_name)) while True:
return objs r = self.get(project, resource, params=_opts)
if not r or object_name not in r.json():
if r is None:
err = ''
else:
err = r.text
self.collectd.warning('Could not find {}: {} {}'.format(
project, object_name, err
))
# Avoid to provide incomplete data by reseting current
# set.
_objects = []
break
resp = r.json() resp = r.json()
bulk_objs = resp.get(object_name) bulk_objs = resp.get(object_name)
if not bulk_objs:
# emtpy list
break
if not bulk_objs: _objects.extend(bulk_objs)
break
objs.extend(bulk_objs) links = resp.get('{}_links'.format(object_name))
if links is None or self.pagination_limit is None:
# Either the pagination is not supported or there is
# no more data
# In both cases, we got at this stage all the data we
# can have.
break
links = resp.get('{}_links'.format(object_name)) # if there is no 'next' link in the response, all data has
if links is None or self.pagination_limit is None: # been read.
# Either the pagination is not supported or there is no more if len([i for i in links if i.get('rel') == 'next']) == 0:
# data break
break
# if there is no 'next' link in the response, all data has been _opts['marker'] = bulk_objs[-1]['id']
# read.
if len([i for i in links if i.get('rel') == 'next']) == 0:
break
opts['marker'] = bulk_objs[-1]['id'] return _objects
return objs poller_id = '{}:{}'.format(project, resource)
if poller_id not in self._threads:
t = base.AsyncPoller(self.collectd,
openstack_api_poller,
self.polling_interval,
poller_id)
t.start()
self._threads[poller_id] = t
t = self._threads[poller_id]
if not t.is_alive():
self.logger.warning("Unexpected end of the thread {}".format(
t.name))
del self._threads[poller_id]
return []
results = t.get_results()
return [] if results is None else results
def count_objects_group_by(self, def count_objects_group_by(self,
list_object, list_object,