Factorize collectd Python plugins

The mechanism of dependant resources and notification is needed for other plugins.

Change-Id: I24dd00583326689294dee4e4d1076c05f3c1e022
This commit is contained in:
Swann Croiset 2016-02-11 14:50:58 +01:00
parent 3f727dfe1b
commit dda7560d78
17 changed files with 93 additions and 91 deletions

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from functools import wraps
import json import json
import signal import signal
import subprocess import subprocess
@ -20,28 +21,53 @@ import sys
import time import time
import traceback import traceback
import collectd
INTERVAL = 10
# A decorator that will call the decorated function only when the plugin has
# detected that it is currently active.
def read_callback_wrapper(f):
@wraps(f)
def wrapper(self, *args, **kwargs):
if self.do_collect_data:
f(self, *args, **kwargs)
return wrapper
class Base(object): class Base(object):
"""Base class for writing Python plugins.""" """Base class for writing Python plugins."""
FAIL = 0
OK = 1
UNKNOWN = 2
MAX_IDENTIFIER_LENGTH = 63 MAX_IDENTIFIER_LENGTH = 63
def __init__(self, *args, **kwargs): def __init__(self, collectd):
self.debug = False self.debug = False
self.timeout = 5 self.timeout = 5
self.max_retries = 3
self.logger = collectd self.logger = collectd
self.collectd = collectd
self.plugin = None self.plugin = None
self.plugin_instance = '' self.plugin_instance = ''
# attributes controlling whether the plugin is in collect mode or not
self.depends_on_resource = None
self.do_collect_data = True
def config_callback(self, conf): def config_callback(self, conf):
for node in conf.children: for node in conf.children:
if node.key == "Debug": if node.key == "Debug":
if node.values[0] in ['True', 'true']: if node.values[0] in ['True', 'true']:
self.debug = True self.debug = True
if node.key == "Timeout": elif node.key == "Timeout":
self.timeout = int(node.values[0]) self.timeout = int(node.values[0])
elif node.key == 'MaxRetries':
self.max_retries = int(node.values[0])
elif node.key == 'DependsOnResource':
self.depends_on_resource = node.values[0]
def read_callback(self): def read_callback(self):
try: try:
@ -60,6 +86,7 @@ class Base(object):
- 'values', a scalar number or a list of numbers if the type - 'values', a scalar number or a list of numbers if the type
defines several datasources. defines several datasources.
- 'type_instance' (optional) - 'type_instance' (optional)
- 'plugin_instance' (optional)
- 'type' (optional, default='gauge') - 'type' (optional, default='gauge')
For example: For example:
@ -82,7 +109,7 @@ class Base(object):
(self.plugin, type_instance[:24], len(type_instance), (self.plugin, type_instance[:24], len(type_instance),
self.MAX_IDENTIFIER_LENGTH)) self.MAX_IDENTIFIER_LENGTH))
v = collectd.Values( v = self.collectd.Values(
plugin=self.plugin, plugin=self.plugin,
type=metric.get('type', 'gauge'), type=metric.get('type', 'gauge'),
plugin_instance=self.plugin_instance, plugin_instance=self.plugin_instance,
@ -174,6 +201,31 @@ class Base(object):
if sys.version_info[0] == 2 and sys.version_info[1] <= 6: if sys.version_info[0] == 2 and sys.version_info[1] <= 6:
signal.signal(signal.SIGCHLD, signal.SIG_DFL) signal.signal(signal.SIGCHLD, signal.SIG_DFL)
def notification_callback(self, notification):
if not self.depends_on_resource:
return
try:
data = json.loads(notification.message)
except ValueError:
return
if 'value' not in data:
self.logger.warning(
"%s: missing 'value' in notification" %
self.__class__.__name__)
elif 'resource' not in data:
self.logger.warning(
"%s: missing 'resource' in notification" %
self.__class__.__name__)
elif data['resource'] == self.depends_on_resource:
do_collect_data = data['value'] > 0
if self.do_collect_data != do_collect_data:
# log only the transitions
self.logger.notice("%s: do_collect_data=%s" %
(self.__class__.__name__, do_collect_data))
self.do_collect_data = do_collect_data
class CephBase(Base): class CephBase(Base):

View File

@ -80,7 +80,7 @@ class CephOSDPerfPlugin(base.CephBase):
'values': self.convert_to_collectd_value(stats[k]) 'values': self.convert_to_collectd_value(stats[k])
} }
plugin = CephOSDPerfPlugin() plugin = CephOSDPerfPlugin(collectd)
def init_callback(): def init_callback():

View File

@ -47,7 +47,7 @@ class CephOSDStatsPlugin(base.CephBase):
osd['fs_perf_stat']['commit_latency_ms']], osd['fs_perf_stat']['commit_latency_ms']],
} }
plugin = CephOSDStatsPlugin() plugin = CephOSDStatsPlugin(collectd)
def init_callback(): def init_callback():

View File

@ -78,7 +78,7 @@ class CephMonPlugin(base.CephBase):
'values': state['count'] 'values': state['count']
} }
plugin = CephMonPlugin() plugin = CephMonPlugin(collectd)
def init_callback(): def init_callback():

View File

@ -116,7 +116,7 @@ class CephPoolPlugin(base.CephBase):
'values': [_up, _down, _in, _out] 'values': [_up, _down, _in, _out]
} }
plugin = CephPoolPlugin() plugin = CephPoolPlugin(collectd)
def init_callback(): def init_callback():

View File

@ -15,6 +15,8 @@
# #
# Collectd plugin for checking the status of OpenStack API services # Collectd plugin for checking the status of OpenStack API services
import collectd import collectd
import base
import openstack import openstack
from urlparse import urlparse from urlparse import urlparse
@ -26,10 +28,6 @@ INTERVAL = openstack.INTERVAL
class APICheckPlugin(openstack.CollectdPlugin): class APICheckPlugin(openstack.CollectdPlugin):
"""Class to check the status of OpenStack API services.""" """Class to check the status of OpenStack API services."""
FAIL = 0
OK = 1
UNKNOWN = 2
# TODO(all): sahara, murano # TODO(all): sahara, murano
CHECK_MAP = { CHECK_MAP = {
'keystone': { 'keystone': {
@ -94,7 +92,7 @@ class APICheckPlugin(openstack.CollectdPlugin):
'region': service['region'] 'region': service['region']
} }
@openstack.read_callback_wrapper @base.read_callback_wrapper
def read_callback(self): def read_callback(self):
for item in self.check_api(): for item in self.check_api():
if item['status'] == self.UNKNOWN: if item['status'] == self.UNKNOWN:

View File

@ -37,7 +37,6 @@ class ElasticsearchClusterHealthPlugin(base.Base):
self.plugin = NAME self.plugin = NAME
self.address = '127.0.0.1' self.address = '127.0.0.1'
self.port = 9200 self.port = 9200
self.max_retries = 3
self.session = requests.Session() self.session = requests.Session()
self.url = None self.url = None
self.session.mount( self.session.mount(
@ -86,7 +85,7 @@ class ElasticsearchClusterHealthPlugin(base.Base):
'values': data[metric] 'values': data[metric]
} }
plugin = ElasticsearchClusterHealthPlugin() plugin = ElasticsearchClusterHealthPlugin(collectd)
def init_callback(): def init_callback():

View File

@ -15,6 +15,8 @@
# #
# Collectd plugin for getting hypervisor statistics from Nova # Collectd plugin for getting hypervisor statistics from Nova
import collectd import collectd
import base
import openstack import openstack
PLUGIN_NAME = 'hypervisor_stats' PLUGIN_NAME = 'hypervisor_stats'
@ -55,7 +57,7 @@ class HypervisorStatsPlugin(openstack.CollectdPlugin):
v.host = host v.host = host
v.dispatch() v.dispatch()
@openstack.read_callback_wrapper @base.read_callback_wrapper
def read_callback(self): def read_callback(self):
r = self.get('nova', 'os-hypervisors/detail') r = self.get('nova', 'os-hypervisors/detail')
if not r: if not r:

View File

@ -128,7 +128,7 @@ class InfluxDBClusterPlugin(base.Base):
} }
plugin = InfluxDBClusterPlugin() plugin = InfluxDBClusterPlugin(collectd)
def init_callback(): def init_callback():

View File

@ -12,13 +12,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import base
import datetime import datetime
import dateutil.parser import dateutil.parser
import dateutil.tz import dateutil.tz
from functools import wraps
import requests import requests
import simplejson as json import simplejson as json
# By default, query OpenStack API endpoints every 50 seconds. We choose a value # By default, query OpenStack API endpoints every 50 seconds. We choose a value
# less than the default group by interval (which is 60 seconds) to avoid gaps # less than the default group by interval (which is 60 seconds) to avoid gaps
# in the Grafana graphs. # in the Grafana graphs.
@ -145,28 +147,12 @@ class OSClient(object):
return r return r
# A decorator that will call the decorated function only when the plugin has class CollectdPlugin(base.Base):
# detected that it is currently active.
def read_callback_wrapper(f):
@wraps(f)
def wrapper(self, *args, **kwargs):
if self.do_collect_data:
f(self, *args, **kwargs)
return wrapper def __init__(self, *args, **kwargs):
super(CollectdPlugin, self).__init__(*args, **kwargs)
class CollectdPlugin(object):
def __init__(self, logger):
self.os_client = None self.os_client = None
self.logger = logger
self.timeout = 5
self.max_retries = 3
self.extra_config = {} self.extra_config = {}
# attributes controlling whether the plugin is in collect mode or not
self.do_collect_data = True
self.depends_on_resource = None
def _build_url(self, service, resource): def _build_url(self, service, resource):
s = (self.get_service(service) or {}) s = (self.get_service(service) or {})
@ -209,12 +195,9 @@ class CollectdPlugin(object):
if x['name'] == service_name), None) if x['name'] == service_name), None)
def config_callback(self, config): def config_callback(self, config):
super(CollectdPlugin, self).config_callback(config)
for node in config.children: for node in config.children:
if node.key == 'Timeout': if node.key == 'Username':
self.timeout = int(node.values[0])
elif node.key == 'MaxRetries':
self.max_retries = int(node.values[0])
elif node.key == 'Username':
username = node.values[0] username = node.values[0]
elif node.key == 'Password': elif node.key == 'Password':
password = node.values[0] password = node.values[0]
@ -222,37 +205,10 @@ class CollectdPlugin(object):
tenant_name = node.values[0] tenant_name = node.values[0]
elif node.key == 'KeystoneUrl': elif node.key == 'KeystoneUrl':
keystone_url = node.values[0] keystone_url = node.values[0]
elif node.key == 'DependsOnResource':
self.depends_on_resource = node.values[0]
self.os_client = OSClient(username, password, tenant_name, self.os_client = OSClient(username, password, tenant_name,
keystone_url, self.timeout, self.logger, keystone_url, self.timeout, self.logger,
self.max_retries) self.max_retries)
def notification_callback(self, notification):
if not self.depends_on_resource:
return
try:
data = json.loads(notification.message)
except ValueError:
return
if 'value' not in data:
self.logger.warning(
"%s: missing 'value' in notification" %
self.__class__.__name__)
elif 'resource' not in data:
self.logger.warning(
"%s: missing 'resource' in notification" %
self.__class__.__name__)
elif data['resource'] == self.depends_on_resource:
do_collect_data = data['value'] > 0
if self.do_collect_data != do_collect_data:
# log only the transitions
self.logger.notice("%s: do_collect_data=%s" %
(self.__class__.__name__, do_collect_data))
self.do_collect_data = do_collect_data
def read_callback(self): def read_callback(self):
""" Read metrics and dispatch values """ Read metrics and dispatch values

View File

@ -15,6 +15,8 @@
# #
# Collectd plugin for getting statistics from Cinder # Collectd plugin for getting statistics from Cinder
import collectd import collectd
import base
import openstack import openstack
PLUGIN_NAME = 'cinder' PLUGIN_NAME = 'cinder'
@ -28,10 +30,7 @@ class CinderStatsPlugin(openstack.CollectdPlugin):
total size of volumes usable and in error state total size of volumes usable and in error state
""" """
def config_callback(self, config): @base.read_callback_wrapper
super(CinderStatsPlugin, self).config_callback(config)
@openstack.read_callback_wrapper
def read_callback(self): def read_callback(self):
volumes_details = self.get_objects_details('cinder', 'volumes') volumes_details = self.get_objects_details('cinder', 'volumes')

View File

@ -15,6 +15,8 @@
# #
# Collectd plugin for getting resource statistics from Glance # Collectd plugin for getting resource statistics from Glance
import collectd import collectd
import base
import openstack import openstack
PLUGIN_NAME = 'glance' PLUGIN_NAME = 'glance'
@ -28,10 +30,7 @@ class GlanceStatsPlugin(openstack.CollectdPlugin):
total size of images usable and in error state total size of images usable and in error state
""" """
def config_callback(self, config): @base.read_callback_wrapper
super(GlanceStatsPlugin, self).config_callback(config)
@openstack.read_callback_wrapper
def read_callback(self): def read_callback(self):
def is_snap(d): def is_snap(d):

View File

@ -15,6 +15,8 @@
# #
# Collectd plugin for getting statistics from Keystone # Collectd plugin for getting statistics from Keystone
import collectd import collectd
import base
import openstack import openstack
PLUGIN_NAME = 'keystone' PLUGIN_NAME = 'keystone'
@ -28,10 +30,7 @@ class KeystoneStatsPlugin(openstack.CollectdPlugin):
number of roles number of roles
""" """
def config_callback(self, config): @base.read_callback_wrapper
super(KeystoneStatsPlugin, self).config_callback(config)
@openstack.read_callback_wrapper
def read_callback(self): def read_callback(self):
def groupby(d): def groupby(d):

View File

@ -15,6 +15,8 @@
# #
# Collectd plugin for getting resource statistics from Neutron # Collectd plugin for getting resource statistics from Neutron
import collectd import collectd
import base
import openstack import openstack
PLUGIN_NAME = 'neutron' PLUGIN_NAME = 'neutron'
@ -31,10 +33,7 @@ class NeutronStatsPlugin(openstack.CollectdPlugin):
number of floating IP addresses broken down by free/associated number of floating IP addresses broken down by free/associated
""" """
def config_callback(self, config): @base.read_callback_wrapper
super(NeutronStatsPlugin, self).config_callback(config)
@openstack.read_callback_wrapper
def read_callback(self): def read_callback(self):
def groupby_network(x): def groupby_network(x):
return "networks.%s" % x.get('status', 'unknown').lower() return "networks.%s" % x.get('status', 'unknown').lower()

View File

@ -15,6 +15,8 @@
# #
# Collectd plugin for getting statistics from Nova # Collectd plugin for getting statistics from Nova
import collectd import collectd
import base
import openstack import openstack
PLUGIN_NAME = 'nova' PLUGIN_NAME = 'nova'
@ -27,10 +29,7 @@ class NovaStatsPlugin(openstack.CollectdPlugin):
number of instances broken down by state number of instances broken down by state
""" """
def config_callback(self, config): @base.read_callback_wrapper
super(NovaStatsPlugin, self).config_callback(config)
@openstack.read_callback_wrapper
def read_callback(self): def read_callback(self):
servers_details = self.get_objects_details('nova', 'servers') servers_details = self.get_objects_details('nova', 'servers')

View File

@ -60,7 +60,7 @@ class PacemakerResourcePlugin(base.Base):
'values': value 'values': value
} }
plugin = PacemakerResourcePlugin() plugin = PacemakerResourcePlugin(collectd)
def init_callback(): def init_callback():

View File

@ -212,7 +212,7 @@ class RabbitMqPlugin(base.Base):
yield {'type_instance': k, 'values': v} yield {'type_instance': k, 'values': v}
plugin = RabbitMqPlugin() plugin = RabbitMqPlugin(collectd)
def init_callback(): def init_callback():