Files
fault/fm-api/fm_api/fm_api.py
albailey ce56ce7ea5 Adding pylint zuul and tox target
Adding a pylint tox target which only suppresses
the error codes that are currently failing.
This will prevent any of the passing checks from being
broken by future submissions.
The currently suppressed codes can be fixed by subsequent
commits.

Adds a zuul pylint target to invoke the new tox target.

Unified the install_command in tox.ini to ensure
the upper constraints are used.

Change-Id: Id2fb8cbb7ce63f844a06403410d461c601822ad6
Story: 2007082
Task: 39020
Signed-off-by: albailey <Al.Bailey@windriver.com>
2020-03-11 09:05:18 -05:00

360 lines
13 KiB
Python
Executable File

#
# Copyright (c) 2013-2018 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
# -*- encoding: utf-8 -*-
#
#
# Author:
#
import copy
from . import constants
import six
import fm_core # pylint: disable=import-error
import threading
fm_api_lock = threading.Lock()
class ClientException(Exception):
pass
class APIException(Exception):
pass
# Fields explanation:
#
# alarm_id: a text string of the alarm identifier
# alarm_state: see ALARM_STATE
# entity_type_id: type of the object raising alarm.
# entity_instance_id: instance information of the object raising alarm.
# severity: see ALARM_SEVERITY
# reason_text: free-format string providing description and additional details
# on the alarm. Optional.
# alarm_type: see ALARM_TYPE
# probable_cause: see ALARM_PROBABLE_CAUSE
# proposed_repair_action:free-format string providing additional details on
# how to clear the alarm. Optional.
# service_affecting: true/false, default to false
# suppression: true/false (allowed/not-allowed), default to false
# uuid: unique identifier of an active alarm instance, filled by FM system
# Timestamp: when the alarm has been raised/updated, filled by FM system
# See CGCS FM Guide for the alarm model specification
class Fault(object):
def __init__(self, alarm_id, alarm_state, entity_type_id,
entity_instance_id, severity, reason_text,
alarm_type, probable_cause, proposed_repair_action,
service_affecting=False, suppression=False,
uuid=None, timestamp=None):
self.alarm_id = alarm_id
self.alarm_state = alarm_state
self.entity_type_id = self._unicode(entity_type_id)
self.entity_instance_id = self._unicode(entity_instance_id)
self.severity = severity
self.reason_text = self._unicode(reason_text)
self.alarm_type = alarm_type
self.probable_cause = probable_cause
self.proposed_repair_action = self._unicode(proposed_repair_action)
self.service_affecting = service_affecting
self.suppression = suppression
self.uuid = uuid
self.timestamp = timestamp
def as_dict(self):
return copy.copy(self.__dict__)
@staticmethod
def _unicode(value):
if six.PY2 and isinstance(value, str):
return six.text_type(value.decode('utf-8'))
else:
return value
class FaultAPIsBase(object):
@staticmethod
def _check_val(data):
if data is None:
return " "
else:
return data
def _alarm_to_str(self, data):
sep = constants.FM_CLIENT_STR_SEP
return (sep + self._check_val(data.uuid) + sep + data.alarm_id + sep +
data.alarm_state + sep + data.entity_type_id + sep +
data.entity_instance_id + sep +
self._check_val(data.timestamp) +
sep + data.severity + sep + self._check_val(data.reason_text) +
sep + data.alarm_type + sep + data.probable_cause + sep +
self._check_val(data.proposed_repair_action) + sep +
str(data.service_affecting) + sep +
str(data.suppression) + sep)
@staticmethod
def _str_to_alarm(alarm_str):
line = alarm_str.split(constants.FM_CLIENT_STR_SEP)
if len(line) < constants.MAX_ALARM_ATTRIBUTES:
return None
else:
data = Fault(line[constants.FM_ALARM_ID_INDEX],
line[constants.FM_ALARM_STATE_INDEX],
line[constants.FM_ENT_TYPE_ID_INDEX],
line[constants.FM_ENT_INST_ID_INDEX],
line[constants.FM_SEVERITY_INDEX],
line[constants.FM_REASON_TEXT_INDEX],
line[constants.FM_ALARM_TYPE_INDEX],
line[constants.FM_CAUSE_INDEX],
line[constants.FM_REPAIR_ACTION_INDEX],
line[constants.FM_SERVICE_AFFECTING_INDEX],
line[constants.FM_SUPPRESSION_INDEX],
line[constants.FM_UUID_INDEX],
line[constants.FM_TIMESTAMP_INDEX])
return data
@staticmethod
def _check_required_attributes(data):
if data.alarm_id is None:
raise ClientException("Alarm id is is required.")
if data.alarm_state is None:
raise ClientException("Alarm state is required.")
if data.severity is None:
raise ClientException("Severity is required.")
if data.alarm_type is None:
raise ClientException("Alarm type is required.")
if data.probable_cause is None:
raise ClientException("Probable Cause is required.")
if data.entity_type_id is None:
raise ClientException("Entity type id is required.")
if data.entity_instance_id is None:
raise ClientException("Entity instance id is required.")
@staticmethod
def _validate_attributes(data):
""" Validate the attributes
only applies to Telco specific attributes"""
if data.alarm_state not in constants.ALARM_STATE:
raise ClientException("Invalid Fault State: %s" %
data.alarm_state)
if data.severity not in constants.ALARM_SEVERITY:
raise ClientException("Invalid Fault Severity: %s" %
data.severity)
if data.alarm_type not in constants.ALARM_TYPE:
raise ClientException("Invalid Fault Type: %s" %
data.alarm_type)
if data.probable_cause not in constants.ALARM_PROBABLE_CAUSE:
raise ClientException("Invalid Fault Probable Cause: %s" %
data.probable_cause)
@staticmethod
def alarm_allowed(alarm_severity, threshold):
def severity_to_int(severity):
if severity == 'none':
return 5
elif severity == constants.FM_ALARM_SEVERITY_CRITICAL:
return 4
elif severity == constants.FM_ALARM_SEVERITY_MAJOR:
return 3
elif severity == constants.FM_ALARM_SEVERITY_MINOR:
return 2
elif severity == constants.FM_ALARM_SEVERITY_WARNING:
return 1
given = severity_to_int(alarm_severity)
threshold = severity_to_int(threshold)
if given < threshold:
return True
return False
class FaultAPIs(FaultAPIsBase):
def set_fault(self, data):
with fm_api_lock:
self._check_required_attributes(data)
self._validate_attributes(data)
buff = self._alarm_to_str(data)
try:
return fm_core.set(buff)
except (RuntimeError, SystemError, TypeError):
return None
def clear_fault(self, alarm_id, entity_instance_id):
with fm_api_lock:
sep = constants.FM_CLIENT_STR_SEP
buff = (sep + self._check_val(alarm_id) + sep +
self._check_val(entity_instance_id) + sep)
try:
resp = fm_core.clear(buff)
# resp may be True/False/None after FaultAPIsV2
# implementation.
# To keep FaultAPIs the same as before,
# return False for None case.
if resp is True:
return True
else:
return False
except (RuntimeError, SystemError, TypeError):
return False
def get_fault(self, alarm_id, entity_instance_id):
with fm_api_lock:
sep = constants.FM_CLIENT_STR_SEP
buff = (sep + self._check_val(alarm_id) + sep +
self._check_val(entity_instance_id) + sep)
try:
resp = fm_core.get(buff)
if resp:
return self._str_to_alarm(resp)
except (RuntimeError, SystemError, TypeError):
pass
return None
def clear_all(self, entity_instance_id):
with fm_api_lock:
try:
resp = fm_core.clear_all(entity_instance_id)
# resp may be True/False/None after FaultAPIsV2
# implementation.
# To keep FaultAPIs the same as before,
# return False for None case.
if resp is True:
return True
else:
return False
except (RuntimeError, SystemError, TypeError):
return False
def get_faults(self, entity_instance_id):
with fm_api_lock:
try:
resp = fm_core.get_by_eid(entity_instance_id)
if resp:
data = []
for i in resp:
data.append(self._str_to_alarm(i))
return data
except (RuntimeError, SystemError, TypeError):
pass
return None
def get_faults_by_id(self, alarm_id):
with fm_api_lock:
try:
resp = fm_core.get_by_aid(alarm_id)
if resp:
data = []
for i in resp:
data.append(self._str_to_alarm(i))
return data
except (RuntimeError, SystemError, TypeError):
pass
return None
class FaultAPIsV2(FaultAPIsBase):
# Input: alarm data
# Return: Success: uuid for the alarm
# Exception: 1. Input Alarm format is not valid
# 2. When there is operation failure
def set_fault(self, data):
with fm_api_lock:
self._check_required_attributes(data)
self._validate_attributes(data)
buff = self._alarm_to_str(data)
uuid = fm_core.set(buff)
if uuid is None:
raise APIException("Failed to execute set_fault.")
return uuid
# Input: alarm_id, entity_instance_id
# Return: Success: True
# Alarm doesn't exist: False
# Exception: When there is operation failure
def clear_fault(self, alarm_id, entity_instance_id):
with fm_api_lock:
sep = constants.FM_CLIENT_STR_SEP
buff = (sep + self._check_val(alarm_id) + sep +
self._check_val(entity_instance_id) + sep)
resp = fm_core.clear(buff)
if resp is False:
# There is operation failure
raise APIException("Failed to execute clear_fault.")
elif resp is None:
# alarm is not found
return False
else:
return True
# Input: alarm_id, entity_instance_id
# Return: Success: Alarm
# Alarm doesn't exist: None
# Exception: When there is operation failure
def get_fault(self, alarm_id, entity_instance_id):
with fm_api_lock:
sep = constants.FM_CLIENT_STR_SEP
buff = (sep + self._check_val(alarm_id) + sep +
self._check_val(entity_instance_id) + sep)
resp = fm_core.get(buff)
if resp is False:
raise APIException("Failed to execute get_fault.")
else:
return self._str_to_alarm(resp) if resp else None
# Input: entity_instance_id
# Return: Success: True
# Alarm doesn't exist: False
# Exception: When there is operation failure
def clear_all(self, entity_instance_id):
with fm_api_lock:
resp = fm_core.clear_all(entity_instance_id)
if resp is False:
# There is operation failure
raise APIException("Failed to execute clear_all.")
elif resp is None:
# alarm is not found
return False
else:
return True
# Input: entity_instance_id
# Return: Success: Alarm list
# Alarm doesn't exist: None
# Exception: When there is operation failure
def get_faults(self, entity_instance_id):
with fm_api_lock:
resp = fm_core.get_by_eid(entity_instance_id)
if resp is False:
raise APIException("Failed to execute get_faults.")
elif resp:
data = []
for i in resp:
data.append(self._str_to_alarm(i))
return data
else:
return None
# Input: alarm_id
# Return: Success: Alarm list
# Alarm doesn't exist: None
# Exception: When there is operation failure
def get_faults_by_id(self, alarm_id):
with fm_api_lock:
resp = fm_core.get_by_aid(alarm_id)
if resp is False:
raise APIException("Failed to execute get_faults_by_id.")
elif resp:
data = []
for i in resp:
data.append(self._str_to_alarm(i))
return data
else:
return None