diff --git a/collectd-extensions/centos/build_srpm.data b/collectd-extensions/centos/build_srpm.data index 2fce999..11618ac 100644 --- a/collectd-extensions/centos/build_srpm.data +++ b/collectd-extensions/centos/build_srpm.data @@ -1,6 +1,7 @@ SRC_DIR="$PKG_BASE" COPY_LIST="$PKG_BASE/src/LICENSE \ + $PKG_BASE/src/README \ $PKG_BASE/src/collectd.conf.pmon \ $PKG_BASE/src/collectd.service \ $PKG_BASE/src/fm_notifier.py \ diff --git a/collectd-extensions/centos/collectd-extensions.spec b/collectd-extensions/centos/collectd-extensions.spec index bbc698e..4130372 100644 --- a/collectd-extensions/centos/collectd-extensions.spec +++ b/collectd-extensions/centos/collectd-extensions.spec @@ -15,11 +15,11 @@ Source2: collectd.conf.pmon # collectd python plugin files - notifiers Source3: fm_notifier.py Source5: plugin_common.py +Source6: README # collectd python plugin files - resource plugins Source11: cpu.py Source12: memory.py -Source14: example.py Source15: ntpq.py Source16: interface.py Source17: remotels.py @@ -31,7 +31,6 @@ Source100: python_plugins.conf Source101: cpu.conf Source102: memory.conf Source103: df.conf -Source104: example.conf Source105: ntpq.conf Source106: interface.conf Source107: remotels.conf @@ -55,7 +54,8 @@ StarlingX collectd extensions %define debug_package %{nil} %define local_unit_dir %{_sysconfdir}/systemd/system -%define local_plugin_dir %{_sysconfdir}/collectd.d +%define local_default_plugin_dir %{_sysconfdir}/collectd.d +%define local_starlingx_plugin_dir %{_sysconfdir}/collectd.d/starlingx %define local_python_extensions_dir /opt/collectd/extensions/python %define local_config_extensions_dir /opt/collectd/extensions/config @@ -67,7 +67,8 @@ StarlingX collectd extensions %install install -m 755 -d %{buildroot}%{_sysconfdir} install -m 755 -d %{buildroot}%{local_unit_dir} -install -m 755 -d %{buildroot}%{local_plugin_dir} +install -m 755 -d %{buildroot}%{local_default_plugin_dir} +install -m 755 -d %{buildroot}%{local_starlingx_plugin_dir} install -m 755 -d %{buildroot}%{local_config_extensions_dir} install -m 755 -d %{buildroot}%{local_python_extensions_dir} @@ -79,10 +80,12 @@ install -m 600 %{SOURCE2} %{buildroot}%{local_config_extensions_dir} install -m 700 %{SOURCE3} %{buildroot}%{local_python_extensions_dir} install -m 700 %{SOURCE5} %{buildroot}%{local_python_extensions_dir} +# install README file into /etc/collectd.d +install -m 644 %{SOURCE6} %{buildroot}%{local_default_plugin_dir} + # collectd python plugin files - resource plugins install -m 700 %{SOURCE11} %{buildroot}%{local_python_extensions_dir} install -m 700 %{SOURCE12} %{buildroot}%{local_python_extensions_dir} -install -m 700 %{SOURCE14} %{buildroot}%{local_python_extensions_dir} install -m 700 %{SOURCE15} %{buildroot}%{local_python_extensions_dir} install -m 700 %{SOURCE16} %{buildroot}%{local_python_extensions_dir} install -m 700 %{SOURCE17} %{buildroot}%{local_python_extensions_dir} @@ -90,17 +93,16 @@ install -m 700 %{SOURCE18} %{buildroot}%{local_python_extensions_dir} install -m 700 %{SOURCE19} %{buildroot}%{local_python_extensions_dir} -# collectd plugin conf files into /etc/collectd.d -install -m 600 %{SOURCE100} %{buildroot}%{local_plugin_dir} -install -m 600 %{SOURCE101} %{buildroot}%{local_plugin_dir} -install -m 600 %{SOURCE102} %{buildroot}%{local_plugin_dir} -install -m 600 %{SOURCE103} %{buildroot}%{local_plugin_dir} -install -m 600 %{SOURCE104} %{buildroot}%{local_plugin_dir} -install -m 600 %{SOURCE105} %{buildroot}%{local_plugin_dir} -install -m 600 %{SOURCE106} %{buildroot}%{local_plugin_dir} -install -m 600 %{SOURCE107} %{buildroot}%{local_plugin_dir} -install -m 600 %{SOURCE108} %{buildroot}%{local_plugin_dir} -install -m 600 %{SOURCE109} %{buildroot}%{local_plugin_dir} +# collectd plugin conf files into /etc/collectd.d/starlingx +install -m 600 %{SOURCE100} %{buildroot}%{local_starlingx_plugin_dir} +install -m 600 %{SOURCE101} %{buildroot}%{local_starlingx_plugin_dir} +install -m 600 %{SOURCE102} %{buildroot}%{local_starlingx_plugin_dir} +install -m 600 %{SOURCE103} %{buildroot}%{local_starlingx_plugin_dir} +install -m 600 %{SOURCE105} %{buildroot}%{local_starlingx_plugin_dir} +install -m 600 %{SOURCE106} %{buildroot}%{local_starlingx_plugin_dir} +install -m 600 %{SOURCE107} %{buildroot}%{local_starlingx_plugin_dir} +install -m 600 %{SOURCE108} %{buildroot}%{local_starlingx_plugin_dir} +install -m 600 %{SOURCE109} %{buildroot}%{local_starlingx_plugin_dir} %clean rm -rf $RPM_BUILD_ROOT @@ -108,6 +110,8 @@ rm -rf $RPM_BUILD_ROOT %files %defattr(-,root,root,-) %config(noreplace) %{local_unit_dir}/collectd.service -%{local_plugin_dir}/* +%{local_default_plugin_dir}/* +%dir %{local_starlingx_plugin_dir} +%{local_starlingx_plugin_dir}/* %{local_config_extensions_dir}/* %{local_python_extensions_dir}/* diff --git a/collectd-extensions/src/README b/collectd-extensions/src/README new file mode 100644 index 0000000..79c5c36 --- /dev/null +++ b/collectd-extensions/src/README @@ -0,0 +1,6 @@ +The upstream default plugin loader files in this +directory are not used. + +The /etc/collectd.conf file is updated to load its +plugins from /etc/collectd.d/starlingx while the +starlingx collectd-extensions package is installed. diff --git a/collectd-extensions/src/collectd.service b/collectd-extensions/src/collectd.service index 1ac7cb0..f2b62cd 100644 --- a/collectd-extensions/src/collectd.service +++ b/collectd-extensions/src/collectd.service @@ -3,6 +3,7 @@ Description=Collectd statistics daemon and extension services Documentation=man:collectd(1) man:collectd.conf(5) Before=pmon.service After=local-fs.target network-online.target +After=config.service syslog.service Requires=local-fs.target network-online.target [Service] diff --git a/collectd-extensions/src/cpu.py b/collectd-extensions/src/cpu.py index cc05323..123cfa5 100755 --- a/collectd-extensions/src/cpu.py +++ b/collectd-extensions/src/cpu.py @@ -22,6 +22,7 @@ import plugin_common as pc import re import socket import time +import tsconfig.tsconfig as tsc PLUGIN = 'platform cpu usage plugin' PLUGIN_DEBUG = 'DEBUG platform cpu' @@ -413,7 +414,11 @@ def init_func(): # do nothing till config is complete. if obj.config_complete() is False: - return False + return pc.PLUGIN_PASS + + if obj._node_ready is False: + obj.node_ready() + return pc.PLUGIN_PASS obj.hostname = socket.gethostname() @@ -472,7 +477,7 @@ def read_func(): if obj.init_complete is False: init_func() - return 0 + return pc.PLUGIN_PASS # epoch time in floating seconds now0 = time.time() diff --git a/collectd-extensions/src/example.py b/collectd-extensions/src/example.py index 50ad8c4..11767da 100755 --- a/collectd-extensions/src/example.py +++ b/collectd-extensions/src/example.py @@ -61,6 +61,10 @@ def read_func(): init_func() return 0 + if obj._node_ready is False: + obj.node_ready() + return 0 + # do the work to create the sample low = int(obj.plugin_data[0]) high = int(obj.plugin_data[1]) diff --git a/collectd-extensions/src/fm_notifier.py b/collectd-extensions/src/fm_notifier.py index d2a5297..b4a3776 100755 --- a/collectd-extensions/src/fm_notifier.py +++ b/collectd-extensions/src/fm_notifier.py @@ -105,8 +105,9 @@ debug_lists = False want_state_audit = False want_vswitch = False -# number of notifier loops before the state is object dumped -DEBUG_AUDIT = 2 +# Number of notifier loop between each audit. +# @ 30 sec interval audit rate is every 5 minutes +AUDIT_RATE = 10 # write a 'value' log on a the resource sample change of more than this amount LOG_STEP = 10 @@ -122,9 +123,6 @@ PLUGIN = 'alarm notifier' # This plugin's degrade function PLUGIN_DEGRADE = 'degrade notifier' -# Path to the plugin's drop dir -PLUGIN_PATH = '/etc/collectd.d/' - # the name of the collectd samples database DATABASE_NAME = 'collectd samples' @@ -194,7 +192,6 @@ DF_MANGLED_DICT = { ALARM_ID__CPU = "100.101" ALARM_ID__MEM = "100.103" ALARM_ID__DF = "100.104" -ALARM_ID__EXAMPLE = "100.113" ALARM_ID__VSWITCH_CPU = "100.102" ALARM_ID__VSWITCH_MEM = "100.115" @@ -209,8 +206,11 @@ ALARM_ID_LIST = [ALARM_ID__CPU, ALARM_ID__VSWITCH_CPU, ALARM_ID__VSWITCH_MEM, ALARM_ID__VSWITCH_PORT, - ALARM_ID__VSWITCH_IFACE, - ALARM_ID__EXAMPLE] + ALARM_ID__VSWITCH_IFACE] + +AUDIT_ALARM_ID_LIST = [ALARM_ID__CPU, + ALARM_ID__MEM, + ALARM_ID__DF] # ADD_NEW_PLUGIN: add plugin name definition # WARNING: This must line up exactly with the plugin @@ -224,7 +224,6 @@ PLUGIN__VSWITCH_PORT = "vswitch_port" PLUGIN__VSWITCH_CPU = "vswitch_cpu" PLUGIN__VSWITCH_MEM = "vswitch_mem" PLUGIN__VSWITCH_IFACE = "vswitch_iface" -PLUGIN__EXAMPLE = "example" # ADD_NEW_PLUGIN: add plugin name to list PLUGIN_NAME_LIST = [PLUGIN__CPU, @@ -233,8 +232,7 @@ PLUGIN_NAME_LIST = [PLUGIN__CPU, PLUGIN__VSWITCH_CPU, PLUGIN__VSWITCH_MEM, PLUGIN__VSWITCH_PORT, - PLUGIN__VSWITCH_IFACE, - PLUGIN__EXAMPLE] + PLUGIN__VSWITCH_IFACE] # Used to find plugin name based on alarm id # for managing degrade for startup alarms. @@ -554,9 +552,7 @@ class fmAlarmObject: lock = None # global lock for mread_func mutex database_setup = False # state of database setup database_setup_in_progress = False # connection mutex - - # Set to True once FM connectivity is verified - # Used to ensure alarms are queried on startup + plugin_path = None fm_connectivity = False def __init__(self, id, plugin): @@ -625,15 +621,17 @@ class fmAlarmObject: # total notification count self.count = 0 - # Debug: state audit controls - self.audit_threshold = 0 - self.audit_count = 0 + # audit counters + self.alarm_audit_threshold = 0 + self.state_audit_count = 0 # For plugins that have multiple instances like df (filesystem plugin) # we need to create an instance of this object for each one. # This dictionary is used to associate an instance with its object. self.instance_objects = {} + self.fault = None + def _ilog(self, string): """Create a collectd notifier info log with the string param""" collectd.info('%s %s : %s' % (PLUGIN, self.plugin, string)) @@ -667,18 +665,18 @@ class fmAlarmObject: if self.id == ALARM_ID__CPU: _print_state() - self.audit_count += 1 + self.state_audit_count += 1 if self.warnings: collectd.info("%s AUDIT %d: %s warning list %s:%s" % (PLUGIN, - self.audit_count, + self.state_audit_count, self.plugin, location, self.warnings)) if self.failures: collectd.info("%s AUDIT %d: %s failure list %s:%s" % (PLUGIN, - self.audit_count, + self.state_audit_count, self.plugin, location, self.failures)) @@ -1245,7 +1243,7 @@ class fmAlarmObject: if self.id == ALARM_ID__DF: # read the df.conf file and return/get a list of mount points - conf_file = PLUGIN_PATH + 'df.conf' + conf_file = fmAlarmObject.plugin_path + 'df.conf' if not os.path.exists(conf_file): collectd.error("%s cannot create filesystem " "instance objects ; missing : %s" % @@ -1312,8 +1310,7 @@ PLUGINS = { PLUGIN__VSWITCH_PORT: fmAlarmObject(ALARM_ID__VSWITCH_PORT, PLUGIN__VSWITCH_PORT), PLUGIN__VSWITCH_IFACE: fmAlarmObject(ALARM_ID__VSWITCH_IFACE, - PLUGIN__VSWITCH_IFACE), - PLUGIN__EXAMPLE: fmAlarmObject(ALARM_ID__EXAMPLE, PLUGIN__EXAMPLE)} + PLUGIN__VSWITCH_IFACE)} ##################################################################### @@ -1431,7 +1428,7 @@ def _build_entity_id(plugin, plugin_instance): def _get_df_mountpoints(): - conf_file = PLUGIN_PATH + 'df.conf' + conf_file = fmAlarmObject.plugin_path + 'df.conf' if not os.path.exists(conf_file): collectd.error("%s cannot create filesystem " "instance objects ; missing : %s" % @@ -1471,7 +1468,7 @@ def _print_obj(obj): collectd.info("%s %s %s - %s - %s\n" % (PLUGIN, prefix, obj.resource_name, obj.plugin, obj.id)) - + collectd.info("%s %s fault obj: %s\n" % (PLUGIN, prefix, obj.fault)) collectd.info("%s %s entity id: %s\n" % (PLUGIN, prefix, obj.entity_id)) collectd.info("%s %s degrade_id: %s\n" % (PLUGIN, prefix, obj.degrade_id)) @@ -1652,6 +1649,16 @@ def init_func(): collectd.info("%s %s:%s init function" % (PLUGIN, tsc.nodetype, fmAlarmObject.host)) + # The path to where collectd is looking for its plugins is specified + # at the end of the /etc/collectd.conf file. + # Because so we search for the 'Include' label in reverse order. + for line in reversed(open("/etc/collectd.conf", 'r').readlines()): + if line.startswith('Include'): + plugin_path = line.split(' ')[1].strip("\n").strip('"') + '/' + fmAlarmObject.plugin_path = plugin_path + collectd.info("plugin path: %s" % fmAlarmObject.plugin_path) + break + # Constant CPU Plugin Object Settings obj = PLUGINS[PLUGIN__CPU] obj.resource_name = "Platform CPU" @@ -1744,12 +1751,6 @@ def init_func(): ########################################################################### - obj = PLUGINS[PLUGIN__EXAMPLE] - obj.resource_name = "Example" - obj.instance_name = PLUGIN__EXAMPLE - obj.repair = "Not Applicable" - collectd.info("%s monitoring %s usage" % (PLUGIN, obj.resource_name)) - # ... # ADD_NEW_PLUGIN: Add new plugin object initialization here ... # ... @@ -1772,8 +1773,17 @@ def notifier_func(nObject): if pluginObject.config_complete() is False: return 0 - if fmAlarmObject.fm_connectivity is False: + if pluginObject._node_ready is False: + collectd.info("%s %s not ready ; from:%s:%s:%s" % + (PLUGIN, + fmAlarmObject.host, + nObject.host, + nObject.plugin, + nObject.plugin_instance)) + pluginObject.node_ready() + return 0 + if fmAlarmObject.fm_connectivity is False: # handle multi threading startup with fmAlarmObject.lock: if fmAlarmObject.fm_connectivity is True: @@ -1791,8 +1801,12 @@ def notifier_func(nObject): try: alarms = api.get_faults_by_id(alarm_id) except Exception as ex: - collectd.error("%s 'get_faults_by_id' exception ; %s" % - (PLUGIN, ex)) + collectd.warning("%s 'get_faults_by_id' exception ; %s" % + (PLUGIN, ex)) + + # if fm is not responding then the node is not ready + pluginObject._node_ready = False + pluginObject.node_ready_count = 0 return 0 if alarms: @@ -1810,7 +1824,7 @@ def notifier_func(nObject): if eid.split(base_eid)[1]: want_alarm_clear = True - collectd.info('%s found %s %s alarm [%s]' % + collectd.info('%s alarm %s:%s:%s found at startup' % (PLUGIN, alarm.severity, alarm_id, @@ -1818,8 +1832,9 @@ def notifier_func(nObject): if want_alarm_clear is True: if clear_alarm(alarm_id, eid) is False: - collectd.error("%s %s:%s clear failed" % - (PLUGIN, + collectd.error("%s alarm %s:%s:%s clear " + "failed" % + (PLUGIN, alarm.severity, alarm_id, eid)) continue @@ -1861,7 +1876,7 @@ def notifier_func(nObject): (PLUGIN_DEGRADE, eid, alarm_id)) fmAlarmObject.fm_connectivity = True - collectd.info("%s connectivity with fm complete" % PLUGIN) + collectd.info("%s node ready" % PLUGIN) collectd.debug('%s notification: %s %s:%s - %s %s %s [%s]' % ( PLUGIN, @@ -1975,15 +1990,6 @@ def notifier_func(nObject): # if obj.warnings or obj.failures: # _print_state(obj) - # If want_state_audit is True then run the audit. - # Primarily used for debug - # default state is False - if want_state_audit: - obj.audit_threshold += 1 - if obj.audit_threshold == DEBUG_AUDIT: - obj.audit_threshold = 0 - obj._state_audit("audit") - # manage reading value change ; store last and log if gt obj.step action = obj.manage_change(nObject) if action == "done": @@ -2006,6 +2012,83 @@ def notifier_func(nObject): if len(mtcDegradeObj.degrade_list): mtcDegradeObj.remove_degrade_for_missing_filesystems() + obj.alarm_audit_threshold += 1 + if obj.alarm_audit_threshold >= AUDIT_RATE: + if want_state_audit: + obj._state_audit("audit") + obj.alarm_audit_threshold = 0 + + ################################################################# + # + # Audit Asserted Alarms + # + # Loop over the list of auditable alarm ids building two + # dictionaries, one containing warning (major) and the other + # failure (critical) with alarm info needed to detect and + # correct stale, missing or severity mismatched alarms for + # the listed alarm ids <100.xxx>. + # + # Note: Conversion in terminology from + # warning -> major and + # failures -> critical + # is done because fm speaks in terms of major and critical + # while the plugin speaks in terms of warning and failure. + # + major_alarm_dict = {} + critical_alarm_dict = {} + for alarm_id in AUDIT_ALARM_ID_LIST: + + tmp_base_obj = get_base_object(alarm_id) + if tmp_base_obj is None: + collectd.error("%s audit %s base object lookup failed" % + (PLUGIN, alarm_id)) + continue + + # Build 2 dictionaries containing current alarmed info. + # Dictionary entries are indexed by entity id to fetch the + # alarm id and last fault object used to create the alarm + # for the mismatch and missing case handling. + # + # { eid : { alarm : , fault : }}, ... } + + # major list for base object from warnings list + if tmp_base_obj.entity_id in tmp_base_obj.warnings: + info = {} + info[pc.AUDIT_INFO_ALARM] = alarm_id + info[pc.AUDIT_INFO_FAULT] = tmp_base_obj.fault + major_alarm_dict[tmp_base_obj.entity_id] = info + + # major list for instance objects from warnings list + for _inst_obj in tmp_base_obj.instance_objects: + inst_obj = tmp_base_obj.instance_objects[_inst_obj] + if inst_obj.entity_id in tmp_base_obj.warnings: + info = {} + info[pc.AUDIT_INFO_ALARM] = alarm_id + info[pc.AUDIT_INFO_FAULT] = inst_obj.fault + major_alarm_dict[inst_obj.entity_id] = info + + # critical list for base object from failures list + if tmp_base_obj.entity_id in tmp_base_obj.failures: + info = {} + info[pc.AUDIT_INFO_ALARM] = alarm_id + info[pc.AUDIT_INFO_FAULT] = tmp_base_obj.fault + critical_alarm_dict[tmp_base_obj.entity_id] = info + + # critical list for instance objects from failures list + for _inst_obj in tmp_base_obj.instance_objects: + inst_obj = tmp_base_obj.instance_objects[_inst_obj] + if inst_obj.entity_id in tmp_base_obj.failures: + info = {} + info[pc.AUDIT_INFO_ALARM] = alarm_id + info[pc.AUDIT_INFO_FAULT] = inst_obj.fault + critical_alarm_dict[inst_obj.entity_id] = info + + pluginObject.alarms_audit(api, AUDIT_ALARM_ID_LIST, + major_alarm_dict, + critical_alarm_dict) + # end alarms audit + ################################################################# + # exit early if there is no alarm update to be made if obj.debounce(base_obj, obj.entity_id, @@ -2046,7 +2129,7 @@ def notifier_func(nObject): reason = obj.reason_warning # build the alarm object - fault = fm_api.Fault( + obj.fault = fm_api.Fault( alarm_id=obj.id, alarm_state=_alarm_state, entity_type_id=fm_constants.FM_ENTITY_TYPE_HOST, @@ -2060,7 +2143,7 @@ def notifier_func(nObject): suppression=base_obj.suppression) try: - alarm_uuid = api.set_fault(fault) + alarm_uuid = api.set_fault(obj.fault) if pc.is_uuid_like(alarm_uuid) is False: collectd.error("%s 'set_fault' failed ; %s:%s ; %s" % (PLUGIN, diff --git a/collectd-extensions/src/interface.py b/collectd-extensions/src/interface.py index 0a2022c..e705329 100755 --- a/collectd-extensions/src/interface.py +++ b/collectd-extensions/src/interface.py @@ -829,6 +829,10 @@ def read_func(): init_func() return 0 + if obj._node_ready is False: + obj.node_ready() + return 0 + if obj.phase < RUN_PHASE__ALARMS_CLEARED: # clear all alarms on first audit diff --git a/collectd-extensions/src/memory.py b/collectd-extensions/src/memory.py index a163a7f..25f5915 100755 --- a/collectd-extensions/src/memory.py +++ b/collectd-extensions/src/memory.py @@ -373,6 +373,9 @@ def init_func(): if obj.config_complete() is False: return 0 + # override node ready threshold for this plugin + obj.node_ready_threshold = 1 + obj.hostname = socket.gethostname() collectd.info('%s: init function for %s' % (PLUGIN, obj.hostname)) @@ -398,6 +401,10 @@ def read_func(): init_func() return 0 + if obj._node_ready is False: + obj.node_ready() + return 0 + # Get epoch time in floating seconds now0 = time.time() diff --git a/collectd-extensions/src/ntpq.py b/collectd-extensions/src/ntpq.py index 166f513..5ef55d7 100755 --- a/collectd-extensions/src/ntpq.py +++ b/collectd-extensions/src/ntpq.py @@ -1,5 +1,5 @@ ############################################################################ -# Copyright (c) 2018-2019 Wind River Systems, Inc. +# Copyright (c) 2018-2021 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -166,15 +166,9 @@ def _raise_alarm(ip=None): if obj.alarm_raised is True: return False - if obj.peer_selected: - reason = "NTP cannot reach external time source; " \ - "syncing with peer controller only" - fm_severity = fm_constants.FM_ALARM_SEVERITY_MINOR - else: - reason = "NTP configuration does not contain any valid " - reason += "or reachable NTP servers." - fm_severity = fm_constants.FM_ALARM_SEVERITY_MAJOR - + reason = "NTP configuration does not contain any valid " + reason += "or reachable NTP servers." + fm_severity = fm_constants.FM_ALARM_SEVERITY_MAJOR eid = obj.base_eid else: @@ -555,6 +549,10 @@ def init_func(): if obj.config_complete() is False: return 0 + if obj._node_ready is False: + obj.node_ready() + return 0 + # get current hostname obj.hostname = obj.gethostname() if not obj.hostname: @@ -619,6 +617,59 @@ def init_func(): return 0 +############################################################################### +# +# Name : _get_one_line_data +# +# Description: Original ntpq response possibly have 2 lines data for 1 record. +# This function concatinate 2 lines into 1 line +# +# Example: +# +# remote refid st t when poll reach delay offset jitter +#============================================================================== +# 192.168.204.3 +# 127.0.0.1 12 u 3 64 373 0.003 2.711 0.847 +#+162.159.200.1 +# 10.44.9.236 3 u 25 64 377 29.304 -1.645 11.298 +#*162.159.200.123 +# 10.44.9.236 3 u 24 64 377 27.504 -0.865 9.443 +# +# To +# remote refid st t when poll reach delay offset jitter +#============================================================================== +# 192.168.204.3 127.0.0.1 12 u 3 64 373 0.003 2.711 0.847 +#+162.159.200.1 10.44.9.236 3 u 25 64 377 29.304 -1.645 11.298 +#*162.159.200.123 10.44.9.236 3 u 24 64 377 27.504 -0.865 9.443 +# (space before column refid would be more) +# +# Parameters : Original ntpq response +# +# Returns : One lines ntpq response +# +############################################################################### +def _get_one_line_data(ntpq_lines): + header = ntpq_lines[0].split() + header_col_num = len(header) + list_oneline = list(ntpq_lines[:2]) + + line_skip = False + for i in range(2, len(ntpq_lines)): + if len(ntpq_lines[i]) > 0: + if line_skip is False: + one_line = ntpq_lines[i] + cols_num = len(ntpq_lines[i].split()) + if cols_num != header_col_num and len(ntpq_lines) >= (i + 1): + next_cols_num = len(ntpq_lines[i + 1].split()) + if header_col_num == (cols_num + next_cols_num): + one_line += ntpq_lines[i + 1] + line_skip = True + list_oneline.append(one_line) + else: + line_skip = False + return list_oneline + + ############################################################################### # # Name : read_func @@ -674,7 +725,7 @@ def read_func(): return 0 # Get the ntp query output into a list of lines - obj.ntpq = data.split('\n') + obj.ntpq = _get_one_line_data(data.split('\n')) # keep track of changes ; only log on changes reachable_list_changed = False @@ -753,6 +804,7 @@ def read_func(): collectd.debug("%s selected server is '%s'" % (PLUGIN, obj.selected_server)) else: + collectd.info("%s selected server is peer" % PLUGIN) # refer to peer refid = '' for i in range(1, len(cols)): @@ -762,12 +814,12 @@ def read_func(): if refid not in ('', '127.0.0.1') and \ not _is_controller(refid) and \ - socket.AF_INET == ip_family: + socket.AF_INET == _is_ip_address(refid): # ipv4, peer controller refer to a time source is not # itself or a controller (this node) obj.selected_server = ip - collectd.debug("peer controller has a reliable " - "source") + collectd.info("%s peer controller has a reliable " + "source: %s" % (PLUGIN, refid)) # anything else is unreachable else: diff --git a/collectd-extensions/src/ovs_interface.py b/collectd-extensions/src/ovs_interface.py index d2877c6..7033f66 100755 --- a/collectd-extensions/src/ovs_interface.py +++ b/collectd-extensions/src/ovs_interface.py @@ -903,6 +903,10 @@ def read_func(): init_func() return 0 + if obj._node_ready is False: + obj.node_ready() + return 0 + if obj.phase < RUN_PHASE__ALARMS_CLEARED: # clear all alarms on first audit diff --git a/collectd-extensions/src/plugin_common.py b/collectd-extensions/src/plugin_common.py index 934de28..5242f3a 100644 --- a/collectd-extensions/src/plugin_common.py +++ b/collectd-extensions/src/plugin_common.py @@ -81,7 +81,8 @@ BASE_GROUPS = [CGROUP_DOCKER, CGROUP_SYSTEM, CGROUP_USER] BASE_GROUPS_EXCLUDE = [CGROUP_K8S, CGROUP_MACHINE] # Groupings of pods by kubernetes namespace -K8S_NAMESPACE_SYSTEM = ['kube-system'] +K8S_NAMESPACE_SYSTEM = ['kube-system', 'armada', 'cert-manager', 'portieris', + 'vault', 'notification'] K8S_NAMESPACE_ADDON = ['monitor', 'openstack'] # Pod parent cgroup name based on annotation. @@ -97,6 +98,9 @@ RESERVED_CPULIST_KEY = 'PLATFORM_CPU_LIST' PLUGIN_PASS = 0 PLUGIN_FAIL = 1 +AUDIT_INFO_ALARM = 'alarm' +AUDIT_INFO_FAULT = 'fault' + class PluginObject(object): @@ -114,7 +118,7 @@ class PluginObject(object): self._config_complete = False # set to True once config is complete self.config_done = False # set true if config_func completed ok self.init_complete = False # set true if init_func completed ok - self.fm_connectivity = False # set true when fm connectivity ok + self._node_ready = False # set true when node is ready self.alarm_type = fm_constants.FM_ALARM_TYPE_7 # OPERATIONAL self.cause = fm_constants.ALARM_PROBABLE_CAUSE_50 # THRESHOLD CROSS @@ -146,9 +150,8 @@ class PluginObject(object): self.http_retry_count = 0 # track http error cases self.HTTP_RETRY_THROTTLE = 6 # http retry threshold self.phase = 0 # tracks current phase; init, sampling - - collectd.debug("%s Common PluginObject constructor [%s]" % - (plugin, url)) + self.node_ready_threshold = 3 # wait for node ready before sampling + self.node_ready_count = 0 # node ready count up counter ########################################################################### # @@ -162,8 +165,10 @@ class PluginObject(object): def init_completed(self): """Declare plugin init complete""" - - collectd.info("%s initialization completed" % self.plugin) + self.hostname = self.gethostname() + self.base_eid = 'host=' + self.hostname + collectd.info("%s %s initialization completed" % + (self.plugin, self.hostname)) self.init_complete = True ########################################################################### @@ -206,6 +211,35 @@ class PluginObject(object): return True + ########################################################################### + # + # Name : node_ready + # + # Description: Test for node ready condition. + # Currently that's just a thresholded count + # + # Parameters : plugin name + # + # Returns : False if node is not ready + # True if node is ready + # + ########################################################################### + + def node_ready(self): + """Check for node ready state""" + + if tsc.nodetype == 'controller': + self.node_ready_count += 1 + if self.node_ready_count < self.node_ready_threshold: + collectd.info("%s node ready count %d of %d" % + (self.plugin, + self.node_ready_count, + self.node_ready_threshold)) + return False + + self._node_ready = True + return True + ########################################################################### # # Name : gethostname @@ -320,6 +354,230 @@ class PluginObject(object): return True + ##################################################################### + # + # Name : clear_alarm + # + # Description: Clear the specified alarm. + # + # Returns : True if operation succeeded + # False if there was an error exception. + # + # Assumptions: Caller can decide to retry based on return status. + # + ##################################################################### + def clear_alarm(self, fm, alarm_id, eid): + """Clear the specified alarm:eid + + :param fm The Fault Manager's API Object + :param alarm_id The alarm identifier , ie 100.103 + :param eid The entity identifier ; host=. + """ + + try: + if fm.clear_fault(alarm_id, eid) is True: + collectd.info("%s %s:%s alarm cleared" % + (self.plugin, alarm_id, eid)) + else: + collectd.info("%s %s:%s alarm already cleared" % + (self.plugin, alarm_id, eid)) + return True + + except Exception as ex: + collectd.error("%s 'clear_fault' exception ; %s:%s ; %s" % + (self.plugin, alarm_id, eid, ex)) + return False + + ######################################################################### + # + # Name : __missing_or_mismatch_alarm_handler + # + # Purpose: Find and correct missing or mismatch alarms + # + # Scope: Private + # + ######################################################################### + def __missing_or_mismatch_alarm_handler(self, + fm, + alarms, + alarm_id, + severity, + sev_alarm_dict): + """Find and correct missing or mismatch alarms + + :param fm The Fault Manager's API Object + :param alarms List of database alarms for alarm id and this host + :param alarm_id The alarm id in context + :param severity Specifies the severity level of sev_alarm_dict + :param sev_alarm_dict An alarm dictionary for either (not both) major + or critical alarms + """ + plugin_prefix = self.plugin + ' audit' + for eid in sev_alarm_dict: + found = False + if alarm_id == sev_alarm_dict[eid].get(AUDIT_INFO_ALARM): + error_case = "missing" + if alarms: + for alarm in alarms: + if alarm.entity_instance_id == eid: + if alarm.severity == severity: + collectd.info("%s alarm %s:%s:%s is correct" % + (plugin_prefix, severity, + alarm_id, eid)) + found = True + else: + error_case = "mismatch" + break + + if found is False: + + fault = sev_alarm_dict[eid].get(AUDIT_INFO_FAULT) + if fault: + collectd.info("%s alarm %s:%s:%s %s ; refreshing" % + (plugin_prefix, + severity, alarm_id, eid, error_case)) + fm.set_fault(fault) + else: + collectd.info("%s alarm %s:%s:%s %s" % + (plugin_prefix, + severity, alarm_id, eid, error_case)) + + ######################################################################### + # + # Name: alarms_audit + # + # Purpose: Ensure the alarm state in the FM database matches the plugin + # + # Description: Query FM for the specified alarm id list. Handle missing, + # stale or severity mismatched alarms. + # + # Algorithm : Each alarm id is queried and the response is filtered by + # current host. The plugin's running state takes precedence. + # This audit will only ever raise, modify or clear alarms in + # the database, never change the alarm state of the plugin. + # + # - clear any asserted alarms that have a clear state + # in the plugin. + # - raise an alarm that is cleared in fm but asserted + # in the plugin. + # - correct alarm severity in fm database to align with + # the plugin. + # + # Assumptions: The severity dictionary arguments (major and critical) + # are used to detect severity mismatches and support alarm + # ids with varying entity ids. + # + # The dictionaries are a list of key value pairs ; aid:eid + # - alarm id as 'aid' + # - entity_id as 'eid' + # + # No need to check for fm api call success and retry on + # failure. Stale alarm clear will be retried on next audit. + # + ######################################################################### + def alarms_audit(self, + fm, + audit_alarm_id_list, + major_alarm_dict, + critical_alarm_dict): + """Audit the fm database for this plugin's alarms state + + :param fm The Fault Manager's API Object + :param audit_alarm_id_list A list of alarm ids to query + :param major_alarm_dict A dictionary of major alarms by aid:eid + :param critical_alarm_dict A dictionary of critical alarms by aid:eid + """ + + if len(audit_alarm_id_list) == 0: + return + + plugin_prefix = self.plugin + ' audit' + + if len(major_alarm_dict): + collectd.debug("%s major_alarm_dict: %s" % + (plugin_prefix, major_alarm_dict)) + + if len(critical_alarm_dict): + collectd.debug("%s critical_alarm_dict: %s" % + (plugin_prefix, critical_alarm_dict)) + + for alarm_id in audit_alarm_id_list: + collectd.debug("%s searching for all '%s' alarms" % + (plugin_prefix, alarm_id)) + try: + database_alarms = [] + tmp = fm.get_faults_by_id(alarm_id) + if tmp is not None: + database_alarms = tmp + + # database alarms might contain same alarm id for other + # hosts and needs to be filtered + alarms = [] + for alarm in database_alarms: + base_eid = alarm.entity_instance_id.split('.')[0] + if self.base_eid == base_eid: + collectd.debug("%s alarm %s:%s:%s in fm" % + (plugin_prefix, + alarm.severity, alarm_id, + alarm.entity_instance_id)) + alarms.append(alarm) + + except Exception as ex: + collectd.error("%s get_faults_by_id %s failed " + "with exception ; %s" % + (plugin_prefix, alarm_id, ex)) + continue + + # Service database alarms case + + # Stale database alarms handling case + remove_alarms_list = [] + if alarms: + for alarm in alarms: + found = False + for eid in major_alarm_dict: + if alarm.entity_instance_id == eid: + found = True + break + if found is False: + for eid in critical_alarm_dict: + if alarm.entity_instance_id == eid: + found = True + break + + if found is False: + collectd.info("%s alarm %s:%s:%s is stale ; clearing" % + (plugin_prefix, + alarm.severity, alarm_id, + alarm.entity_instance_id)) + + # clear stale alarm. + self.clear_alarm(fm, alarm_id, + alarm.entity_instance_id) + remove_alarms_list.append(alarm) + for alarm in remove_alarms_list: + alarms.remove(alarm) + else: + collectd.debug("%s database has no %s alarms" % + (plugin_prefix, alarm_id)) + + # If major alarms exist then check for + # missing or mismatch state in fm database + if len(major_alarm_dict): + self.__missing_or_mismatch_alarm_handler(fm, + alarms, + alarm_id, + 'major', + major_alarm_dict) + # If critical alarms exist then check for + # missing or mismatch state in fm database. + if len(critical_alarm_dict): + self.__missing_or_mismatch_alarm_handler(fm, + alarms, + alarm_id, + 'critical', + critical_alarm_dict) + ########################################################################### # # Name : make_http_request diff --git a/collectd-extensions/src/ptp.py b/collectd-extensions/src/ptp.py index 5d3ad93..f083061 100755 --- a/collectd-extensions/src/ptp.py +++ b/collectd-extensions/src/ptp.py @@ -661,7 +661,9 @@ def read_func(): init_func() return 0 - if obj.fm_connectivity is False: + if obj._node_ready is False: + if obj.node_ready() is False: + return 0 try: # query FM for existing alarms. @@ -723,9 +725,6 @@ def read_func(): else: collectd.info("%s no startup alarms found" % PLUGIN) - obj.fm_connectivity = True - # assert_all_alarms() - # This plugin supports PTP in-service state change by checking # service state on every audit ; every 5 minutes. data = subprocess.check_output([SYSTEMCTL, diff --git a/collectd-extensions/src/remotels.py b/collectd-extensions/src/remotels.py index 1330220..7207fab 100755 --- a/collectd-extensions/src/remotels.py +++ b/collectd-extensions/src/remotels.py @@ -169,6 +169,10 @@ def read_func(): init_func() return 0 + if obj._node_ready is False: + obj.node_ready() + return 0 + # get current state current_enabled_state = obj.enabled diff --git a/monitor-tools/scripts/schedtop b/monitor-tools/scripts/schedtop index a170040..99776a7 100755 --- a/monitor-tools/scripts/schedtop +++ b/monitor-tools/scripts/schedtop @@ -361,7 +361,7 @@ REPEAT_LOOP: for (my $repeat=1; $repeat <= $::arg_repeat; $repeat++) { # Build up output line by specific area my $L = (); $L = ''; - $L .= sprintf "%6s %6s %6s ", "TID", "PID", "PPID"; + $L .= sprintf "%7s %7s %7s ", "TID", "PID", "PPID"; if ($::opt_P{$::P_ps} != $::P_none) { $L .= sprintf "%1s %2s %*s %2s %3s %4s ", "S", "P", $w_aff, "AFF", "PO", "NI", "PR"; @@ -406,7 +406,7 @@ REPEAT_LOOP: for (my $repeat=1; $repeat <= $::arg_repeat; $repeat++) { # Build up output line by specific area $L = ''; - $L .= sprintf "%6d %6d %6d ", + $L .= sprintf "%7d %7d %7d ", $tid, $::D_task{$tid}{'pid'}, $::D_task{$tid}{'ppid'}; if ($::opt_P{$::P_ps} != $::P_none) { $L .= sprintf "%1s %2d %*s %2s %3d %4d ", diff --git a/test-requirements.txt b/test-requirements.txt index 8e3e0bf..983269d 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,4 +1,4 @@ # hacking pulls in flake8 hacking bashate >= 0.2 -bandit!=1.6.0,>=1.1.0,<2.0.0 +bandit;python_version>="3.5"